改进 流式输出

This commit is contained in:
Twilight 2024-07-25 02:53:47 +08:00
parent 5887413644
commit 01ca320e06
10 changed files with 214 additions and 81 deletions

View File

@ -9,6 +9,7 @@
use App\Repositories\LLM\HumanMessage; use App\Repositories\LLM\HumanMessage;
use GuzzleHttp\Exception\GuzzleException; use GuzzleHttp\Exception\GuzzleException;
use Illuminate\Console\Command; use Illuminate\Console\Command;
use Illuminate\Support\Facades\Storage;
class TestLLM extends Command class TestLLM extends Command
{ {
@ -48,6 +49,12 @@ public function handle()
$q = $this->ask('请输入问题'); $q = $this->ask('请输入问题');
if ($q == 'q') {
Storage::put('chat.json', json_encode($history->getMessages(), JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
return 0;
}
if (empty($q)) { if (empty($q)) {
$q = '北京天气'; $q = '北京天气';
} }
@ -66,7 +73,10 @@ public function handle()
} }
} elseif ($item->role == ChatEnum::AssistantChunk) { } elseif ($item->role == ChatEnum::AssistantChunk) {
echo $item->getLastAppend(); echo $item->getLastAppend();
} elseif ($item->role == ChatEnum::Assistant) {
echo "\n完整输出: ".$item->content;
} }
} }
echo "\n"; echo "\n";

View File

@ -3,20 +3,29 @@
namespace App\Http\Controllers\Api; namespace App\Http\Controllers\Api;
use App\Http\Controllers\Controller; use App\Http\Controllers\Controller;
use App\LLM\Qwen;
use App\Models\Chat; use App\Models\Chat;
use App\Repositories\LLM\ChatEnum;
use App\Repositories\LLM\History;
use App\Repositories\LLM\HumanMessage;
use Illuminate\Http\Request; use Illuminate\Http\Request;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Str; use Illuminate\Support\Str;
class ChatHistoryController extends Controller class ChatHistoryController extends Controller
{ {
private string $stream_prefix_id = 'chat_stream_id:';
private string $stream_random_id_prefix = 'chat_stream_random_id:';
/** /**
* Display a listing of the resource. * Display a listing of the resource.
*/ */
public function index(Chat $chat) public function index(Chat $chat)
{ {
return $this->success( return $this->success(
$chat->histories()->get() $chat->histories()->latest()->get()
); );
} }
@ -27,17 +36,16 @@ public function store(Request $request, Chat $chat)
{ {
// 获取上一条记录 // 获取上一条记录
$last_history = $chat->histories()->orderBy('id', 'desc')->first(); $last_history = $chat->histories()->orderBy('id', 'desc')->first();
// 如果存在 // 如果存在
if ($last_history) { if ($last_history) {
// 如果上一条是 user // 如果上一条是 user
if ($last_history->role == 'user') { if ($last_history->role == ChatEnum::Human) {
// 不允许发送消息 // 不允许发送消息
return $this->badRequest('你已经回复过了,请等待 AI 响应。'); return $this->badRequest('你已经回复过了,请等待 AI 响应。');
} }
// 检查缓存是否存在 // 检查缓存是否存在
$last_stream_key = 'chat_history_id:'.$last_history->id; $last_stream_key = $this->stream_prefix_id.$chat->id;
// 如果存在 // 如果存在
if (Cache::has($last_stream_key)) { if (Cache::has($last_stream_key)) {
@ -49,25 +57,102 @@ public function store(Request $request, Chat $chat)
'message' => 'required', 'message' => 'required',
]); ]);
$chat->histories()->create([ $history = $chat->histories()->create([
'content' => $request->input('message'), 'content' => $request->input('message'),
'role' => 'user', 'role' => ChatEnum::Human,
]); ]);
// 随机生成一个 ID
$random_id = Str::random(20); $random_id = Str::random(20);
$last_stream_key = 'chat_history_id:'.$last_history->id; // 缓存 key
// 设置缓存 $history_stream_key = $this->stream_prefix_id.$chat->id;
Cache::put($last_stream_key, $random_id, 60); Cache::put($history_stream_key, $random_id, 60);
Cache::put("chat_history_stream_id:$random_id", $last_history->id, 60); Cache::put($this->stream_random_id_prefix.$random_id, $chat->id, 60);
return $this->success([ return $this->success([
'stream_url' => route('chat-stream', $random_id), 'stream_url' => route('chat-stream', $random_id),
]); ]);
} }
/**
* @throws \Exception
*/
public function stream(string $stream_id) public function stream(string $stream_id)
{ {
return $stream_id; $cached = Cache::get($this->stream_random_id_prefix.$stream_id);
if (! $cached) {
return $this->badRequest('无效的流 ID。');
}
$chat = Chat::with('assistant.tools', 'assistant')->findOrFail($cached);
$histories = $chat->histories()->get();
// 如果为 0
if ($histories->count() == 0) {
return $this->badRequest('请先发送消息。');
}
$llm_histories = new History();
$llm_histories->setHistory($histories->toArray());
$llm = new Qwen();
$history = new History();
$tools = $chat->assistant->tools()->get();
$llm->setTools($tools);
$llm->setHistory($history);
$last_human_message = $histories[$histories->count() - 1];
$history->addMessage(new HumanMessage($last_human_message));
$stream = $llm->streamResponse();
$response = response()->stream(function () use (&$stream, &$chat) {
// 循环输出
foreach ($stream as $item) {
$data = [];
if ($item->role == ChatEnum::Tool) {
if ($item->processing) {
$data = [
'message' => '正在执行: '.$item->content,
];
} else {
$data = [
'message' => '结果: '.$item->content,
];
}
} elseif ($item->role == ChatEnum::AssistantChunk) {
$data = [
'message' => $item->getLastAppend(),
];
} elseif ($item->role == ChatEnum::Assistant) {
$chat->histories()->create([
'content' => $item->content,
'role' => ChatEnum::Assistant,
]);
Log::info('AI: '.$item->content);
}
echo 'data: '.json_encode($data, JSON_UNESCAPED_UNICODE)."\n\n";
ob_flush();
flush();
}
});
Cache::forget($this->stream_random_id_prefix.$stream_id);
Cache::forget($this->stream_prefix_id.$chat->id);
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('Connection', 'keep-alive');
return $response;
} }
} }

View File

@ -9,8 +9,10 @@
use App\Repositories\LLM\ToolRequestMessage; use App\Repositories\LLM\ToolRequestMessage;
use App\Repositories\LLM\ToolResponseMessage; use App\Repositories\LLM\ToolResponseMessage;
use GuzzleHttp\Client; use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use GuzzleHttp\Exception\GuzzleException; use GuzzleHttp\Exception\GuzzleException;
use Illuminate\Database\Eloquent\Collection; use Illuminate\Database\Eloquent\Collection;
use Illuminate\Support\Facades\Log;
class Qwen implements BaseLLM class Qwen implements BaseLLM
{ {
@ -26,7 +28,9 @@ class Qwen implements BaseLLM
const END_OF_MESSAGE = "/\r\n\r\n|\n\n|\r\r/"; const END_OF_MESSAGE = "/\r\n\r\n|\n\n|\r\r/";
private bool $request_again = false; private bool $retry = false;
private int $retires = 0;
public function __construct() public function __construct()
{ {
@ -76,8 +80,8 @@ public function streamResponse()
$url = config('services.dashscope.api_base').'/compatible-mode/v1/chat/completions'; $url = config('services.dashscope.api_base').'/compatible-mode/v1/chat/completions';
$this->request_again = true; $this->retry = true;
while ($this->request_again) { while ($this->retry) {
$ai_chunk_message = new AIChunkMessage(''); $ai_chunk_message = new AIChunkMessage('');
@ -88,10 +92,19 @@ public function streamResponse()
'tools' => $this->tool_functions, 'tools' => $this->tool_functions,
]; ];
$response = $client->request('POST', $url, [ try {
'body' => json_encode($request_body), $response = $client->request('POST', $url, [
'stream' => true, 'body' => json_encode($request_body),
]); 'stream' => true,
]);
} catch (ClientException $e) {
Log::error($e->getMessage());
Log::debug('http body', [$e->getResponse()->getBody()->getContents()]);
// 保存 history
Log::debug('chat history', $this->history->getMessages());
return;
}
$body = $response->getBody(); $body = $response->getBody();
@ -120,15 +133,27 @@ public function streamResponse()
if (empty($d)) { if (empty($d)) {
return; return;
} }
// log
$event = json_decode($d, true); $event = json_decode($d, true);
Log::debug('event', $event);
if (isset($event['choices'][0])) { if (isset($event['choices'][0])) {
$finish_reason = $event['choices'][0]['finish_reason'];
// 检测是不是 stop // 检测是不是 stop
if ($event['choices'][0]['finish_reason'] == 'stop') { if ($finish_reason == 'stop') {
$ai_chunk_message->processing = false; $ai_chunk_message->processing = false;
$this->history->addMessage($ai_chunk_message->toAIMessage()); $ai_message = $ai_chunk_message->toAIMessage();
$this->history->addMessage($ai_message);
yield $ai_message;
Log::debug('stop!');
break; break;
// yield $ai_chunk_message; } elseif ($finish_reason == 'tool_calls') {
$this->retry = true;
Log::debug('finished_reason is tool call, set retry');
} }
$delta = $event['choices'][0]['delta']; $delta = $event['choices'][0]['delta'];
@ -140,62 +165,54 @@ public function streamResponse()
$info = $info['properties']; $info = $info['properties'];
} }
yield new ToolRequestMessage( // 开始调用
$tool_req = new ToolRequestMessage(
content: $this->tool_info['function']['name'], content: $this->tool_info['function']['name'],
); );
$r = $this->callTool($this->tool_info['function']['name'], $info); yield $tool_req;
// $tool_response = [ Log::debug('tool req', [$tool_req]);
// 'name' => $this->tool_info['function']['name'],
// 'role' => 'tool',
// 'content' => $r,
// ];
$tool_call_message = new AIToolCallMessage(content: ''); $tool_call_message = new AIToolCallMessage(content: '');
$tool_call_message->tool_calls = [ $tool_call_message->tool_calls = [
$this->tool_info, $this->tool_info,
]; ];
// 调用
$r = $this->callTool($this->tool_info['function']['name'], $info);
$tool_response_message = new ToolResponseMessage(content: $r); $tool_response_message = new ToolResponseMessage(content: $r);
$tool_response_message->name = $this->tool_info['function']['name']; $tool_response_message->name = $this->tool_info['function']['name'];
$this->history->addMessage($tool_call_message); $this->history->addMessage($tool_call_message);
$this->history->addMessage($tool_response_message); $this->history->addMessage($tool_response_message);
Log::debug('tool call message', [$tool_call_message]);
Log::debug('tool response', [$tool_response_message]);
yield $tool_call_message; yield $tool_call_message;
yield $tool_response_message; yield $tool_response_message;
//
//
//
// $this->history[] = $tool_response;
// yield new ToolResponseMessage(
// content: $tool_response_message->content,
// );
// 清空参数,以备二次调用 // 清空参数,以备二次调用
$this->clearToolInfo(); $this->clearToolInfo();
// 再次请求以获取结果
$this->request_again = true;
} elseif (isset($delta['tool_calls'])) { } elseif (isset($delta['tool_calls'])) {
// 更新 tool_info // 更新 tool_info
$call_info = $delta['tool_calls'][0]; $call_info = $delta['tool_calls'][0];
if (isset($call_info['id'])) { if (! empty($call_info['id'])) {
$this->tool_info['id'] = $call_info['id']; $this->tool_info['id'] = $call_info['id'];
} }
if (isset($call_info['index'])) { if (! empty($call_info['index'])) {
$this->tool_info['index'] = $call_info['index']; $this->tool_info['index'] = $call_info['index'];
} }
if (isset($call_info['function']['name'])) { if (! empty($call_info['function']['name'])) {
$this->tool_info['function']['name'] = $call_info['function']['name']; $this->tool_info['function']['name'] = $call_info['function']['name'];
} }
if (isset($call_info['function']['arguments'])) { if (! empty($call_info['function']['arguments'])) {
$this->tool_info['function']['arguments'] .= $call_info['function']['arguments']; $this->tool_info['function']['arguments'] .= $call_info['function']['arguments'];
} }
} elseif (empty($delta['choices'][0]['finish_reason'])) { } elseif (empty($delta['choices'][0]['finish_reason'])) {
@ -204,9 +221,11 @@ public function streamResponse()
$ai_chunk_message->processing = true; $ai_chunk_message->processing = true;
yield $ai_chunk_message; yield $ai_chunk_message;
Log::debug('chunk', [$ai_chunk_message]);
} }
$this->request_again = false; $this->retry = false;
} }
} }
@ -217,6 +236,8 @@ public function streamResponse()
} }
$this->retry = false;
} }
private function callTool($tool_name, $args): string private function callTool($tool_name, $args): string

View File

@ -10,12 +10,17 @@ class ChatHistory extends Model
protected $fillable = [ protected $fillable = [
'chat_id', 'chat_id',
'content', 'content',
'tool_calls',
'role', 'role',
'input_tokens', 'input_tokens',
'output_tokens', 'output_tokens',
'total_tokens', 'total_tokens',
]; ];
protected $casts = [
'tool_calls' => 'array',
];
public function chat(): BelongsTo public function chat(): BelongsTo
{ {
return $this->belongsTo(Chat::class); return $this->belongsTo(Chat::class);

View File

@ -4,7 +4,7 @@
class AIToolCallMessage extends BaseMessage class AIToolCallMessage extends BaseMessage
{ {
public ChatEnum $role = ChatEnum::Assistant; public ChatEnum $role = ChatEnum::AssistantToolCall;
public array $tool_calls; public array $tool_calls;
} }

View File

@ -6,6 +6,8 @@ enum ChatEnum: string
{ {
case Assistant = 'assistant'; case Assistant = 'assistant';
case AssistantChunk = 'assistant_chunk'; case AssistantChunk = 'assistant_chunk';
case AssistantToolCall = 'assistant_tool_call';
case Tool = 'tool'; case Tool = 'tool';
case Human = 'user'; case Human = 'user';
case System = 'system';
} }

View File

@ -2,6 +2,8 @@
namespace App\Repositories\LLM; namespace App\Repositories\LLM;
use Exception;
class History class History
{ {
protected array $history = []; protected array $history = [];
@ -16,9 +18,33 @@ public function getMessages(): array
return $this->history; return $this->history;
} }
/**
* @throws Exception
*/
public function setHistory(array $history): void public function setHistory(array $history): void
{ {
// foreach ($history as $h) {
// // 转换
// $role = match ($h['role']) {
// 'user' => ChatEnum::Human,
// 'assistant' => ChatEnum::Assistant,
// 'system' => ChatEnum::System,
// 'tool' => ChatEnum::Tool,
// default => throw new Exception('Unknown role: '.$h['role']),
// };
//
// $this->history[] = match ($role) {
// ChatEnum::Human => new HumanMessage($h['content']),
// ChatEnum::Assistant => new AIMessage($h['content']),
// ChatEnum::System => new SystemMessage($h['content']),
// ChatEnum::Tool => new ToolMessage($h['content']),
// ChatEnum::AssistantChunk => throw new \Exception('To be implemented'),
// };
// }
$this->history = $history; $this->history = $history;
// dd($this->history);
} }
public function clearHistory(): void public function clearHistory(): void
@ -31,9 +57,18 @@ public function getForApi(): array
$history = []; $history = [];
foreach ($this->history as $h) { foreach ($this->history as $h) {
// map roles
$role = match ($h->role) {
// ChatEnum::Human => 'user',
// ChatEnum::Assistant => 'assistant',
// ChatEnum::System => 'system',
// ChatEnum::Tool => 'tool',
ChatEnum::AssistantToolCall => ChatEnum::Assistant,
default => $h->role,
};
$a = [ $a = [
'role' => $h->role->value, 'role' => $role->value,
'content' => $h->content, 'content' => $h->content,
]; ];

View File

@ -0,0 +1,8 @@
<?php
namespace App\Repositories\LLM;
class SystemMessage extends BaseMessage
{
public ChatEnum $role = ChatEnum::System;
}

View File

@ -9,4 +9,6 @@ class ToolResponseMessage extends BaseMessage
public bool $requesting = false; public bool $requesting = false;
public string $name; public string $name;
public string $id;
} }

View File

@ -1,35 +0,0 @@
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
Schema::create('cache', function (Blueprint $table) {
$table->string('key')->primary();
$table->mediumText('value');
$table->integer('expiration');
});
Schema::create('cache_locks', function (Blueprint $table) {
$table->string('key')->primary();
$table->string('owner');
$table->integer('expiration');
});
}
/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::dropIfExists('cache');
Schema::dropIfExists('cache_locks');
}
};