Skip to content

Commit

Permalink
add stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jhnnsrs committed Jul 16, 2024
1 parent f1378f3 commit cb66424
Show file tree
Hide file tree
Showing 12 changed files with 824 additions and 4 deletions.
38 changes: 38 additions & 0 deletions contrib/builders/livekitio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import TYPE_CHECKING


if TYPE_CHECKING:

from fakts.base_models import LinkingContext
from contrib.backends.docker_backend import DockerServiceDescriptor, SelfServiceDescriptor


def _create_base_url(self: "SelfServiceDescriptor", context: "LinkingContext", descriptor: "DockerServiceDescriptor", inside_port="80/tcp"):
try:
outside_port = descriptor.port_map[inside_port]
except KeyError:
raise Exception(f"Service {descriptor.internal_host} does not expose port {inside_port} only exposes ports: " + str(descriptor.port_map.keys()))

protocol = "http" #TODO: Until livekit supports custom certificates
inside_base_url = f"{protocol}://{descriptor.internal_host}:{inside_port.split('/')[0]}"
outside_base_url = f"{protocol}://{context.request.host}:{outside_port}"

# Depending on how the service is accessed, we need to return the correct base_url
if context.request.host == self.internal_host:
base_url = inside_base_url
else:
base_url = outside_base_url

return base_url



def livekit(self: "SelfServiceDescriptor", context: "LinkingContext", descriptor: "DockerServiceDescriptor"):

base_url = _create_base_url(self, context, descriptor, inside_port="7880/tcp")



return { "endpoint_url": base_url }


4 changes: 2 additions & 2 deletions fakts/graphql/mutations/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ekke.types import Info

from fakts import enums, inputs, models, scalars, types
from fakts.base_models import DevelopmentClientConfig, Manifest
from fakts.base_models import DevelopmentClientConfig, Manifest, Requirement
from fakts.builders import create_client
from fakts.models import Composition

Expand Down Expand Up @@ -39,7 +39,7 @@ def create_developmental_client(info: Info, input: inputs.DevelopmentClientInput
version=input.manifest.version,
logo=input.manifest.logo,
scopes=input.manifest.scopes or [],
requirements=[],
requirements={x.key: Requirement(service=x.service, optional=x.optional, description=x.description) for x in input.requirements},
)

client = create_client(
Expand Down
1 change: 1 addition & 0 deletions kammer/graphql/mutations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .comment import *
from .room import *
from .stream import *
171 changes: 171 additions & 0 deletions kammer/graphql/mutations/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import hashlib
import json
import logging
from typing import Any, Dict, List, Tuple

import strawberry
import strawberry_django
from ekke.types import Info
from kammer import enums, inputs, models, scalars, types

logger = logging.getLogger(__name__)
from django.contrib.auth import get_user_model
from kammer import inputs
from livekit import api
from livekit.protocol.room import ListRoomsRequest
from django.conf import settings

@strawberry.input
class CreateStreamInput:
room: strawberry.ID
title: str | None = None
agent_id: str | None = None


async def create_video_stream(info: Info, input: CreateStreamInput) -> types.Stream:

room = await models.Room.objects.aget(id=input.room)


agent, _ = await models.Agent.objects.aget_or_create(
user=info.context.request.user,
app=info.context.request.app,
room=room,
name=input.agent_id,
)


# Check if room exists.

print(settings.LIVEKIT)

lkapi = api.LiveKitAPI(
url=settings.LIVEKIT["API_URL"],
api_key=settings.LIVEKIT["API_KEY"],
api_secret=settings.LIVEKIT["API_SECRET"],
)

reponse = await lkapi.room.list_rooms(ListRoomsRequest(names=[room.streamlit_room_id]))

if reponse.rooms:
room_info = reponse.rooms[0]
print(room_info)

else:
room_info = await lkapi.room.create_room(
api.CreateRoomRequest(name=room.streamlit_room_id),
)




token = api.AccessToken(api_key=settings.LIVEKIT["API_KEY"],
api_secret=settings.LIVEKIT["API_SECRET"]) \
.with_identity("agent-" + str(agent.id)) \
.with_name("agent-" + agent.name) \
.with_grants(api.VideoGrants(
room_join=True,
room=room.streamlit_room_id,
)).to_jwt()

print(token)

exp, _ = await models.Stream.objects.aupdate_or_create(
title=input.title or "default",
agent=agent,
defaults=dict(token=token)
)



return exp


@strawberry.input
class JoinStreamInput:
id: strawberry.ID


async def join_video_stream(info: Info, input: JoinStreamInput) -> types.Stream:
creator = info.context.request.user

room = await models.Room.objects.aget(id=input.room)


agent, _ = await models.Agent.objects.get_or_create(
user=info.context.request.user,
app=info.context.request.app,
room=room,
name=input.agent_id,
)

token = api.AccessToken() \
.with_identity(agent.id) \
.with_name(agent.name) \
.with_grants(api.VideoGrants(
room_join=True,
room=room.streamlit_room_id,
)).to_jwt()

exp = await models.Stream.objects.acreate(
title=input.title or "Untitled",
agent=agent,
token=token
)



return exp


@strawberry.input
class LeaveStreamInput:
id: strawberry.ID


async def leave_video_stream(info: Info, input: LeaveStreamInput) -> types.Stream:

exp = await models.Stream.objects.aget(id=input.id)

i


await exp.delete()

return exp


@strawberry.input
class StructureInput:
object: strawberry.ID
identifier: str


@strawberry.input
class SendMessageInput:
room: strawberry.ID
agent_id: str
text: str
parent: strawberry.ID | None = None
notify: bool | None = None
attach_structures: List[StructureInput] | None = None


def send(info: Info, input: SendMessageInput) -> types.Message:

agent, _ = models.Agent.objects.get_or_create(
user=info.context.request.user,
app=info.context.request.app,
room_id=input.room,
name=input.agent_id,
)

message = models.Message.objects.create(agent=agent, text=input.text)
if input.attach_structures:
for structure in input.attach_structures:
structure, _ = models.Structure.objects.get_or_create(
object=structure.object, identifier=structure.identifier
)
message.attached_structures.add(structure)

return message
48 changes: 48 additions & 0 deletions kammer/migrations/0003_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Generated by Django 4.2.5 on 2024-07-16 07:35

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
dependencies = [
("kammer", "0002_alter_agent_app"),
]

operations = [
migrations.CreateModel(
name="Stream",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"title",
models.CharField(
help_text="The Title of the Stream", max_length=1000
),
),
(
"token",
models.CharField(
help_text="The token of the stream", max_length=4000
),
),
(
"agent",
models.ForeignKey(
help_text="The agent that created this stream",
on_delete=django.db.models.deletion.CASCADE,
related_name="streams",
to="kammer.agent",
),
),
],
),
]
18 changes: 18 additions & 0 deletions kammer/migrations/0004_stream_unique_stream_for_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.5 on 2024-07-16 13:34

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("kammer", "0003_stream"),
]

operations = [
migrations.AddConstraint(
model_name="stream",
constraint=models.UniqueConstraint(
fields=("agent", "title"), name="Unique stream for agent"
),
),
]
28 changes: 28 additions & 0 deletions kammer/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class Room(models.Model):
@property
def messages(self):
return Message.objects.filter(agent__room=self).all()


@property
def streamlit_room_id(self):
return f"lok-room-{self.id}"


class Agent(models.Model):
Expand All @@ -47,6 +52,29 @@ class Agent(models.Model):
)


class Stream(models.Model):
agent = models.ForeignKey(
"Agent",
on_delete=models.CASCADE,
related_name="streams",
help_text="The agent that created this stream",
)
title = models.CharField(max_length=1000, help_text="The Title of the Stream")
token = models.CharField(max_length=4000, help_text="The token of the stream")

class Meta:
constraints = [
models.UniqueConstraint(
fields=["agent", "title"], name="Unique stream for agent"
)
]







class Message(models.Model):
"""
Message represent the message of an agent on a room
Expand Down
29 changes: 29 additions & 0 deletions kammer/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class Room:
title: str
description: str
messages: list["Message"]
agents: list["Agent"]

@strawberry_django.field
def streams(self, info: Info) -> list["Stream"]:
return models.Stream.objects.filter(agent__room=self).all()


@strawberry_django.type(models.Agent, pagination=True)
Expand All @@ -34,10 +39,34 @@ class Agent:
room: Room







@strawberry_django.type(models.Stream, pagination=True)
class Stream:
id: strawberry.ID
agent: Agent
title: str

@strawberry.field
def token(self, info: Info) -> str:
return self.token







@strawberry_django.type(models.Message, pagination=True, filters=filters.MessageFilter)
class Message:
id: strawberry.ID
title: str
text: str
agent: Agent
attached_structures: list[Structure]



Loading

0 comments on commit cb66424

Please sign in to comment.