fix: harden streaming chat persistence and access control

Persist streaming chat state during generator cleanup, close the SSE inner stream safely, and reject cross-user conversation access while locking the behavior with focused regressions.
This commit is contained in:
2026-04-02 21:49:53 +08:00
parent 4251a79062
commit b3f9b5e715
3 changed files with 1232 additions and 86 deletions

View File

@@ -130,34 +130,42 @@ async def chat_stream(
agent_svc = AgentService(db)
async def stream_generator():
stream = None
msg_id = None
should_emit_done = False
try:
conv_id, msg_id, stream = await agent_svc.chat(
user_id=current_user.id,
message=data.message,
conversation_id=data.conversation_id,
file_ids=data.file_ids,
model_name=data.model_name,
)
except ValueError as exc:
yield f"event: error\ndata: {json.dumps({'error': str(exc)}, ensure_ascii=False)}\n\n"
return
try:
conv_id, msg_id, stream = await agent_svc.chat(
user_id=current_user.id,
message=data.message,
conversation_id=data.conversation_id,
file_ids=data.file_ids,
model_name=data.model_name,
)
except ValueError as exc:
yield f"event: error\ndata: {json.dumps({'error': str(exc)}, ensure_ascii=False)}\n\n"
return
yield f"event: metadata\ndata: {json.dumps({'conversation_id': conv_id, 'message_id': msg_id})}\n\n"
yield f"event: metadata\ndata: {json.dumps({'conversation_id': conv_id, 'message_id': msg_id})}\n\n"
try:
async for event in stream:
event_type = event.get('type', 'progress')
if event_type == 'chunk':
yield f"event: chunk\ndata: {json.dumps({'content': event.get('content', '')}, ensure_ascii=False)}\n\n"
elif event_type == 'error':
yield f"event: error\ndata: {json.dumps({'error': event.get('error', '未知错误')}, ensure_ascii=False)}\n\n"
else:
payload = {k: v for k, v in event.items() if k != 'type'}
yield f"event: progress\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
except Exception as e:
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
try:
async for event in stream:
event_type = event.get('type', 'progress')
if event_type == 'chunk':
yield f"event: chunk\ndata: {json.dumps({'content': event.get('content', '')}, ensure_ascii=False)}\n\n"
elif event_type == 'error':
yield f"event: error\ndata: {json.dumps({'error': event.get('error', '未知错误')}, ensure_ascii=False)}\n\n"
else:
payload = {k: v for k, v in event.items() if k != 'type'}
yield f"event: progress\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
except Exception as e:
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
should_emit_done = msg_id is not None
if should_emit_done:
yield f"event: done\ndata: {json.dumps({'message_id': msg_id})}\n\n"
finally:
yield f"event: done\ndata: {json.dumps({'message_id': msg_id})}\n\n"
if stream is not None:
await stream.aclose()
return StreamingResponse(
stream_generator(),