-
-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
following to Feature Request: Implementing Persistent Speaker Embeddings Across Conversations #227 #228
base: main
Are you sure you want to change the base?
following to Feature Request: Implementing Persistent Speaker Embeddings Across Conversations #227 #228
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from pyannote.metrics.diarization import DiarizationErrorRate | ||
from rx.core import Observer | ||
from typing_extensions import Literal | ||
import json | ||
|
||
|
||
class WindowClosedException(Exception): | ||
|
@@ -55,6 +56,46 @@ def on_error(self, error: Exception): | |
def on_completed(self): | ||
self.patch() | ||
|
||
###### | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove blank comment |
||
|
||
class RedisWriter(Observer): | ||
def __init__(self, uri: Text, redis_client, patch_collar: float = 0.05): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
super().__init__() | ||
self.uri = uri | ||
self.redis_client = redis_client | ||
self.conversation_id = uri # Assuming URI as a unique identifier for the conversation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This attribute is not needed. |
||
self.patch_collar = patch_collar | ||
|
||
def on_next(self, value: Union[Tuple, Annotation]): | ||
if isinstance(value, tuple): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
prediction, _ = value[:2] | ||
# Process each segment in the prediction | ||
for segment, _, label in prediction.itertracks(yield_label=True): | ||
# Update last centroids for each speaker | ||
|
||
# Write data to Redis queues | ||
diarization_data = { | ||
'start': segment.start, | ||
'end': segment.end, | ||
'speaker_id': label, | ||
} | ||
if len(value)==3: | ||
diarization_data['centroids'] = value[-1].tolist() | ||
|
||
self.redis_client.rpush(f'diarization_{self.conversation_id}', json.dumps(diarization_data)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not name this "diarization". We could also be writing the output of a VAD pipeline here |
||
|
||
else: | ||
prediction = value | ||
|
||
def on_error(self, error: Exception): | ||
# Handle error (optional) | ||
pass | ||
|
||
def on_completed(self): | ||
# Handle completion (optional) | ||
pass | ||
|
||
####### | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove blank comment |
||
|
||
class PredictionAccumulator(Observer): | ||
def __init__(self, uri: Optional[Text] = None, patch_collar: float = 0.05): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to return centroids with each output? In order to have persistent centroids it would be enough to have a way of extracting them after inference and a way of setting them before inference. I think the best way to do this would be to expose getters and setters.
The thing is I don't want the RedisWriter to be exclusively for diarization. I would like for it to be compatible with VAD and in the future with transcription too.
Adding things at the end of each output is not a sustainable solution. With time, we could end up having huge tuples. An alternative would be to have different output objects per pipeline, like
DiarizationOutput
andVADOutput
, but I want all sinks to be able to handle outputs seamlessly so they should remain at least similar enough. This would also require big changes to all inference and sink objects.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we need to have setnroings ducing inference in the real time application. Obviously, every iteration is overkill
My usecase is about global speakers identification in real - time environment, which means I want to obtain speaker embeddings at some points for vector store lookup. After inference is not enough. As for now, I think good idea is to obtain a centroid for each new speaker after it's well - converged. Getting embeddings at each iteration obviously is overkill, but that worka for me at this point, as I can keep only the last state of centroids downstream. Maybe, a better idea is to develop an on-demand method for that, but I am pretty new to real time application and not sure how to do it right. Exposing getters looks good to me, but I am not sure how to use them