-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose the real-time internal state of the batcher through SSE #3065
base: main
Are you sure you want to change the base?
Conversation
@@ -112,6 +136,10 @@ impl Backend for BackendV3 { | |||
.is_ok() | |||
} | |||
|
|||
fn events(&self) -> BroadcastReceiver<EngineState> { | |||
self.state_events.0.subscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should send the current state as soon as a new connection is open (new subscribe)
let stream = infer.events(); | ||
let sse = | ||
Sse::new(BroadcastStream::from(stream).map(|state| { | ||
Event::default().json_data(state.map_err(|err| axum::Error::new(err))?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name of the event ?
// Dispatch new state to the proxy | ||
{ | ||
// Lock free operation (read) | ||
let num_queued_tokens = engine_state.read().await.in_queue; | ||
{ | ||
// Critical section, doing as little as possible here | ||
let mut engine_state = engine_state.write().await; | ||
engine_state.in_queue = num_queued_tokens + entry_num_tokens; | ||
} | ||
|
||
// Send new state to the channel for broadcasting | ||
if let Err(err) = queue_events.send(*engine_state.read().await) { | ||
tracing::warn!( | ||
"Failed to send BatchEvent::QueueChanged({}): {err}", | ||
num_queued_tokens + entry_num_tokens | ||
) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Dispatch new state to the proxy | |
{ | |
// Lock free operation (read) | |
let num_queued_tokens = engine_state.read().await.in_queue; | |
{ | |
// Critical section, doing as little as possible here | |
let mut engine_state = engine_state.write().await; | |
engine_state.in_queue = num_queued_tokens + entry_num_tokens; | |
} | |
// Send new state to the channel for broadcasting | |
if let Err(err) = queue_events.send(*engine_state.read().await) { | |
tracing::warn!( | |
"Failed to send BatchEvent::QueueChanged({}): {err}", | |
num_queued_tokens + entry_num_tokens | |
) | |
} | |
} | |
engine_state.modify(|state| state.in_queue += entry_num_tokens); |
Function modify then send the new state in the SSE
No description provided.