Files
JARVIS/backend/app/agents/tools/time_reasoning.py

270 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
from datetime import UTC, date, datetime, time, timedelta
from langchain_core.tools import tool
_WEEKDAY_MAP = {"": 0, "": 1, "": 2, "": 3, "": 4, "": 5, "": 6, "": 6}
_DEFAULT_HOUR_BY_PERIOD = {
"morning": 9,
"noon": 12,
"afternoon": 15,
"evening": 20,
}
_TIME_KEYWORDS = ("今天", "明天", "后天", "本周", "这周", "下周", "", "星期", "", "", "早上", "上午", "中午", "下午", "晚上", "今晚", "", ":", "")
def _parse_datetime(value: str) -> datetime:
normalized = value.strip().replace("Z", "+00:00")
return datetime.fromisoformat(normalized)
def extract_reference_datetime(current_datetime_context: str | None) -> datetime:
context = (current_datetime_context or "").strip()
if context:
for pattern in (r"current_time_utc:\s*(\S+)", r"CURRENT_TIME:\s*(\S+)", r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2}))"):
match = re.search(pattern, context)
if match:
return _parse_datetime(match.group(1))
return datetime.now(UTC)
def _normalize_local_iso(value: datetime) -> str:
return value.replace(tzinfo=None).isoformat(timespec="seconds")
def _normalize_datetime_iso(value: datetime) -> str:
if value.tzinfo is not None:
return value.isoformat(timespec="seconds")
return _normalize_local_iso(value)
def _normalize_date_iso(value: date) -> str:
return value.isoformat()
def _is_iso_datetime(value: str) -> bool:
try:
parsed = _parse_datetime(value)
except ValueError:
return False
return isinstance(parsed, datetime)
def _is_iso_date(value: str) -> bool:
try:
date.fromisoformat(value.strip())
return True
except ValueError:
return False
def _has_explicit_time(text: str) -> bool:
return bool(
re.search(r"\d{1,2}[:]\d{2}", text)
or re.search(r"\d{1,2}点(?:半|(?:\d{1,2})分?)?", text)
or any(keyword in text for keyword in ("早上", "上午", "中午", "下午", "晚上", "今晚"))
)
def _detect_period(text: str) -> str | None:
if any(keyword in text for keyword in ("晚上", "今晚")):
return "evening"
if "下午" in text:
return "afternoon"
if "中午" in text:
return "noon"
if any(keyword in text for keyword in ("早上", "上午", "早晨", "清晨")):
return "morning"
return None
def _resolve_time(text: str) -> tuple[time, bool, str | None]:
period = _detect_period(text)
colon_match = re.search(r"(\d{1,2})[:](\d{2})", text)
if colon_match:
hour = int(colon_match.group(1))
minute = int(colon_match.group(2))
if period in {"afternoon", "evening"} and hour < 12:
hour += 12
return time(hour=hour, minute=minute), False, period
half_match = re.search(r"(\d{1,2})点半", text)
if half_match:
hour = int(half_match.group(1))
if period in {"afternoon", "evening"} and hour < 12:
hour += 12
return time(hour=hour, minute=30), False, period
dot_match = re.search(r"(\d{1,2})点(?:(\d{1,2})分?)?", text)
if dot_match:
hour = int(dot_match.group(1))
minute = int(dot_match.group(2) or 0)
if period in {"afternoon", "evening"} and hour < 12:
hour += 12
if period == "noon" and hour < 11:
hour += 12
return time(hour=hour, minute=minute), False, period
if period:
return time(hour=_DEFAULT_HOUR_BY_PERIOD[period], minute=0), True, period
return time(hour=9, minute=0), True, None
def _resolve_date(text: str, reference: datetime) -> tuple[date, str]:
stripped = text.strip()
if _is_iso_date(stripped):
return date.fromisoformat(stripped), "explicit_date"
month_day_match = re.search(r"(\d{1,2})月(\d{1,2})日", stripped)
if month_day_match:
month = int(month_day_match.group(1))
day = int(month_day_match.group(2))
candidate = date(reference.year, month, day)
if candidate < reference.date() - timedelta(days=1):
candidate = date(reference.year + 1, month, day)
return candidate, "explicit_month_day"
if "后天" in stripped:
return reference.date() + timedelta(days=2), "relative_day"
if "明天" in stripped:
return reference.date() + timedelta(days=1), "relative_day"
if "今天" in stripped:
return reference.date(), "relative_day"
weekday_match = re.search(r"((?:本周|这周|下周)?)(?:周|星期)([一二三四五六日天])", stripped)
if weekday_match:
prefix = weekday_match.group(1)
weekday = _WEEKDAY_MAP[weekday_match.group(2)]
current_weekday = reference.date().weekday()
delta = weekday - current_weekday
if prefix == "下周":
delta += 7 if delta <= 0 else 7
elif prefix in {"本周", "这周"}:
if delta < 0:
delta += 7
elif delta < 0:
delta += 7
return reference.date() + timedelta(days=delta), "relative_weekday"
return reference.date(), "reference_day"
def resolve_time_expression_data(
expression: str,
*,
current_datetime_context: str | None = None,
prefer: str = "datetime",
) -> dict:
text = (expression or "").strip()
if not text:
raise ValueError("expression 不能为空")
reference = extract_reference_datetime(current_datetime_context)
if _is_iso_datetime(text):
parsed = _parse_datetime(text)
return {
"expression": text,
"reference_time": reference.isoformat(),
"grain": "datetime",
"resolved_date": _normalize_date_iso(parsed.date()),
"resolved_datetime": _normalize_datetime_iso(parsed),
"assumed_time": False,
"reason": "explicit_datetime",
}
if _is_iso_date(text):
parsed_date = date.fromisoformat(text)
return {
"expression": text,
"reference_time": reference.isoformat(),
"grain": "date",
"resolved_date": _normalize_date_iso(parsed_date),
"resolved_datetime": None,
"assumed_time": False,
"reason": "explicit_date",
}
resolved_date, date_reason = _resolve_date(text, reference)
resolved_time, assumed_time, period = _resolve_time(text)
has_explicit_time = _has_explicit_time(text)
grain = "date" if prefer == "date" and not has_explicit_time else "datetime"
resolved_dt = datetime.combine(resolved_date, resolved_time)
note = date_reason
if period:
note = f"{note}:{period}"
if assumed_time:
note = f"{note}:assumed_time"
return {
"expression": text,
"reference_time": reference.isoformat(),
"grain": grain,
"resolved_date": _normalize_date_iso(resolved_date),
"resolved_datetime": None if grain == "date" else _normalize_local_iso(resolved_dt),
"assumed_time": assumed_time,
"reason": note,
}
@tool
def resolve_time_expression(
expression: str,
current_datetime_context: str = "",
prefer: str = "datetime",
) -> str:
"""解析中文自然语言时间表达,基于当前参考时间返回明确的日期或 datetime。prefer 支持 datetime/date。"""
try:
payload = resolve_time_expression_data(
expression,
current_datetime_context=current_datetime_context or None,
prefer=prefer,
)
return json.dumps(payload, ensure_ascii=False)
except Exception as exc:
return json.dumps(
{
"expression": expression,
"error": str(exc),
},
ensure_ascii=False,
)
def normalize_tool_time_arguments(tool_name: str, args: dict, current_datetime_context: str | None) -> dict:
normalized = dict(args)
if tool_name == "create_reminder":
raw_value = next((normalized.get(key) for key in ("reminder_at", "datetime", "at", "remind_at", "time") if isinstance(normalized.get(key), str) and normalized.get(key).strip()), None)
if raw_value and not _is_iso_datetime(raw_value):
payload = resolve_time_expression_data(raw_value, current_datetime_context=current_datetime_context, prefer="datetime")
normalized["reminder_at"] = payload["resolved_datetime"]
return normalized
if tool_name in {"create_schedule_task", "create_task"}:
raw_value = next((normalized.get(key) for key in ("due_date", "date") if isinstance(normalized.get(key), str) and normalized.get(key).strip()), None)
if raw_value and not _is_iso_datetime(raw_value) and not _is_iso_date(raw_value):
prefer = "datetime" if tool_name == "create_schedule_task" or _has_explicit_time(raw_value) else "date"
payload = resolve_time_expression_data(raw_value, current_datetime_context=current_datetime_context, prefer=prefer)
normalized["due_date"] = payload["resolved_datetime"] or payload["resolved_date"]
return normalized
if tool_name in {"create_todo", "create_goal", "get_schedule_day"}:
field_name = {
"create_todo": "todo_date",
"create_goal": "goal_date",
"get_schedule_day": "target_date",
}[tool_name]
raw_value = normalized.get(field_name)
if isinstance(raw_value, str) and raw_value.strip() and not _is_iso_date(raw_value):
payload = resolve_time_expression_data(raw_value, current_datetime_context=current_datetime_context, prefer="date")
normalized[field_name] = payload["resolved_date"]
return normalized
return normalized
__all__ = ["resolve_time_expression", "resolve_time_expression_data", "normalize_tool_time_arguments", "extract_reference_datetime"]