diff --git a/backends/llamacpp/src/backend.rs b/backends/llamacpp/src/backend.rs index 09afbc7b..5262bd8a 100644 --- a/backends/llamacpp/src/backend.rs +++ b/backends/llamacpp/src/backend.rs @@ -18,7 +18,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::TryAcquireError; use tokio::time::Instant; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{error, info}; +use tracing::{debug, error, info}; type BoxedOpaqueStream = Box; @@ -113,7 +113,7 @@ fn llama_generate_callback( }, top_tokens: vec![], }; - println!("Generated token: {new_token_id} -> logits={new_token_logit}, is_eos={is_eos}"); + debug!("Generated token: {new_token_id} -> logits={new_token_logit}, is_eos={is_eos}"); unsafe { if let Err(ref err) = (*channel).0.send(Ok(response)) { @@ -121,7 +121,7 @@ fn llama_generate_callback( "Failed to send back token to the client: {}", err.to_string() ); - } + }; } } @@ -131,6 +131,7 @@ unsafe fn scheduler_loop( ) { loop { if let Ok(mut ctx) = backlog.recv() { + let start = Instant::now(); let stream = BoxedOpaqueStream::new(OpaqueStream(ctx.stream)); let stream_ptr = Box::into_raw(stream); let result = backend.pin_mut().stream( @@ -143,7 +144,7 @@ unsafe fn scheduler_loop( ); // Make sure we re-keep track of the OpaqueStream box - let _ = Box::from_raw(stream_ptr); + let stream = Box::from_raw(stream_ptr); match result { Ok(n_tokens) => { @@ -151,12 +152,27 @@ unsafe fn scheduler_loop( ctx.generated_tokens.set_len(n_tokens); } - println!( - "Generated {} tokens -> {:?}", - n_tokens, &ctx.generated_tokens - ); + let _ = stream.0.send(Ok(InferStreamResponse::End { + token: Token { + id: ctx.generated_tokens[n_tokens - 1], + text: "".to_string(), + logprob: 0.0, + special: false, + }, + top_tokens: vec![], + generated_text: GeneratedText { + text: "".to_string(), + generated_tokens: n_tokens as u32, + finish_reason: FinishReason::Length, + seed: Some(ctx.sampling_params.seed), + }, + start, + queued: start, + })); + + debug!("Generated {n_tokens} tokens -> {:?}", ctx.generated_tokens); } - Err(err) => println!("Error: {}", err), + Err(err) => println!("Error: {err}"), } } else { info!("IPC channel is closed, exiting the scheduler loop");