Skip to content

Commit a70db19

Browse files
authored
Add gRPC support to Vanilla (#16)
1 parent 5b6a085 commit a70db19

File tree

10 files changed

+239
-20
lines changed

10 files changed

+239
-20
lines changed

latte/build.gradle.kts

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
plugins {
22
`java-library`
3-
kotlin("jvm") version "1.9.20"
4-
id("com.google.devtools.ksp") version "1.9.20-1.0.14"
3+
kotlin("jvm") version "2.0.21"
4+
id("com.google.devtools.ksp") version "2.0.21-1.0.25"
55
}
66

77
group = "gg.beemo.latte"
@@ -10,7 +10,7 @@ version = "1.0.0"
1010
dependencies {
1111

1212
// Kotlin
13-
val kotlinCoroutinesVersion = "1.7.3"
13+
val kotlinCoroutinesVersion = "1.9.0"
1414
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
1515
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinCoroutinesVersion")
1616

@@ -24,18 +24,18 @@ dependencies {
2424
implementation("com.rabbitmq:amqp-client:$rabbitVersion")
2525

2626
// JSON
27-
val moshiVersion = "1.14.0"
27+
val moshiVersion = "1.15.1"
2828
implementation("com.squareup.moshi:moshi:$moshiVersion")
2929
ksp("com.squareup.moshi:moshi-kotlin-codegen:$moshiVersion")
3030

3131
// Misc
3232
implementation("org.jetbrains:annotations:24.1.0")
33-
val log4jVersion = "2.22.0"
33+
val log4jVersion = "2.24.1"
3434
compileOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
3535
testImplementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
3636

3737
// JUnit testing framework
38-
val junitVersion = "5.10.1"
38+
val junitVersion = "5.11.2"
3939
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
4040
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
4141

latte/src/main/resources/log4j2.xml

+3
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,8 @@
1515
<Logger name="org.apache.kafka" level="WARN">
1616
<AppenderRef ref="Console" />
1717
</Logger>
18+
<Logger name="io.grpc.netty" level="INFO">
19+
<AppenderRef ref="Console" />
20+
</Logger>
1821
</Loggers>
1922
</Configuration>

proto/ratelimit.proto

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
syntax = "proto3";
2+
package beemo.ratelimit;
3+
4+
option java_multiple_files = true;
5+
option java_package = "gg.beemo.latte.proto";
6+
option java_outer_classname = "RatelimitProto";
7+
8+
9+
service Ratelimit {
10+
rpc ReserveQuota(RatelimitRequest) returns (RatelimitQuota);
11+
}
12+
13+
message RatelimitRequest {
14+
RatelimitType type = 1;
15+
fixed64 client_id = 2;
16+
optional bool probe_only = 3;
17+
optional uint32 max_delay = 4;
18+
}
19+
20+
message RatelimitQuota {
21+
bool granted = 1;
22+
uint64 at = 2;
23+
}
24+
25+
enum RatelimitType {
26+
GLOBAL = 0;
27+
IDENTIFY = 1;
28+
}

vanilla/.env.example

+10-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
# your environment variables in quotes unless it is part of it.
44

55
#------------
6-
# Kafka Connection
6+
# RabbitMQ Connection
77
#---------------
8-
KAFKA_HOST=
9-
KAFKA_USE_TLS=
8+
RABBIT_HOST=
9+
RABBIT_USE_TLS=
10+
RABBIT_USERNAME=
11+
RABBIT_PASSWORD=
12+
13+
#------------
14+
# gRPC Server
15+
#---------------
16+
GRPC_PORT=

vanilla/build.gradle.kts

+39-3
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,63 @@
11
plugins {
22
application
33
java
4-
kotlin("jvm") version "1.9.20"
4+
kotlin("jvm") version "2.0.21"
5+
id("com.google.protobuf") version "0.9.4"
56
}
67

78
group = "gg.beemo.vanilla"
89
version = "1.0.0"
910

11+
val grpcVersion = "1.68.0"
12+
val grpcKotlinStubVersion = "1.4.1"
13+
val grpcProtobufVersion = "4.28.2"
14+
1015
dependencies {
1116
// Kotlin
12-
val kotlinCoroutinesVersion = "1.7.3"
17+
val kotlinCoroutinesVersion = "1.9.0"
1318
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
1419

1520
// Beemo shared code
1621
implementation("gg.beemo.latte:latte")
1722

23+
// gRPC
24+
implementation("io.grpc:grpc-netty-shaded:$grpcVersion")
25+
implementation("io.grpc:grpc-protobuf:$grpcVersion")
26+
implementation("io.grpc:grpc-kotlin-stub:$grpcKotlinStubVersion")
27+
implementation("com.google.protobuf:protobuf-kotlin:$grpcProtobufVersion")
28+
1829
// Logging
19-
val log4jVersion = "2.22.0"
30+
val log4jVersion = "2.24.1"
2031
implementation("org.apache.logging.log4j:log4j-api:$log4jVersion")
2132
implementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
2233
implementation("org.apache.logging.log4j:log4j-slf4j2-impl:$log4jVersion")
2334
}
2435

36+
protobuf {
37+
protoc {
38+
artifact = "com.google.protobuf:protoc:$grpcProtobufVersion"
39+
}
40+
plugins {
41+
create("grpc") {
42+
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
43+
}
44+
create("grpckt") {
45+
artifact = "io.grpc:protoc-gen-grpc-kotlin:$grpcKotlinStubVersion:jdk8@jar"
46+
}
47+
}
48+
generateProtoTasks {
49+
all().forEach {
50+
it.plugins {
51+
create("grpc")
52+
create("grpckt")
53+
}
54+
it.builtins {
55+
create("kotlin")
56+
}
57+
}
58+
}
59+
}
60+
2561
repositories {
2662
mavenCentral()
2763
}

vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt renamed to vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ import kotlin.time.Duration.Companion.seconds
1414
// Give request expiry a bit of leeway in case of clock drift
1515
private val EXPIRY_GRACE_PERIOD = 5.seconds.inWholeMilliseconds
1616

17-
class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
17+
class BrokerRpcRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
1818

1919
private val log by Log
20-
private val globalRatelimitProvider = RatelimitProvider(50, 1.seconds)
21-
private val identifyRatelimitProvider = RatelimitProvider(1, 5.seconds)
20+
private val globalRatelimitProvider = KafkaRatelimitProvider(50, 1.seconds)
21+
private val identifyRatelimitProvider = KafkaRatelimitProvider(1, 5.seconds)
2222

2323
init {
2424
rpc<SharedRatelimitData.RatelimitRequestData, Unit>(
@@ -54,7 +54,7 @@ class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
5454

5555
}
5656

57-
private class RatelimitProvider(private val burst: Int, private val duration: Duration) {
57+
private class KafkaRatelimitProvider(private val burst: Int, private val duration: Duration) {
5858

5959
private val limiters = ConcurrentHashMap<String, SuspendingRatelimit>()
6060

vanilla/src/main/java/gg/beemo/vanilla/Config.java

+2
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ public class Config {
1010

1111
public static String RABBIT_PASSWORD = "guest";
1212

13+
public static int GRPC_PORT = 1337;
14+
1315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package gg.beemo.vanilla
2+
3+
import gg.beemo.latte.logging.Log
4+
import gg.beemo.latte.proto.RatelimitGrpcKt
5+
import gg.beemo.latte.proto.RatelimitQuota
6+
import gg.beemo.latte.proto.ratelimitQuota
7+
import gg.beemo.latte.proto.RatelimitRequest
8+
import gg.beemo.latte.proto.RatelimitType
9+
import kotlinx.coroutines.sync.Mutex
10+
import kotlinx.coroutines.sync.withLock
11+
import java.util.LinkedList
12+
import java.util.concurrent.ConcurrentHashMap
13+
import kotlin.time.Duration
14+
import kotlin.time.Duration.Companion.seconds
15+
16+
class GrpcRatelimitService : RatelimitGrpcKt.RatelimitCoroutineImplBase() {
17+
18+
private val log by Log
19+
private val globalRatelimits = ClientRatelimits(50, 1.seconds)
20+
private val identifyRatelimits = ClientRatelimits(1, 5.seconds)
21+
22+
override suspend fun reserveQuota(request: RatelimitRequest): RatelimitQuota {
23+
val clientRatelimits = when (request.type) {
24+
RatelimitType.GLOBAL -> globalRatelimits
25+
RatelimitType.IDENTIFY -> identifyRatelimits
26+
else -> throw IllegalArgumentException("Unknown ratelimit type ${request.type}")
27+
}
28+
29+
val ratelimit = clientRatelimits.getClientRatelimit(request.clientId)
30+
val maxDelay = if (request.hasMaxDelay()) request.maxDelay.toLong() else null
31+
val (granted, at) = ratelimit.reserveQuota(request.probeOnly, maxDelay)
32+
val delay = (at - System.currentTimeMillis()).coerceAtLeast(0)
33+
34+
if (request.probeOnly) {
35+
log.debug(
36+
"Probed {} quota slot for clientId {} is at {} (in {} ms)",
37+
request.type,
38+
request.clientId,
39+
at,
40+
delay
41+
)
42+
} else if (granted) {
43+
log.debug(
44+
"Reserved {} quota slot for clientId {} at {} (in {} ms)",
45+
request.type,
46+
request.clientId,
47+
at,
48+
delay
49+
)
50+
} else {
51+
val maxTimestamp = if (maxDelay != null) System.currentTimeMillis() + maxDelay else null
52+
log.debug(
53+
"Failed to reserve {} quota slot for clientId {}, next slot would be at {} (in {} ms), requested max delay was {} (-> {})",
54+
request.type,
55+
request.clientId,
56+
at,
57+
delay,
58+
maxDelay,
59+
maxTimestamp
60+
)
61+
}
62+
63+
return ratelimitQuota {
64+
this.granted = granted
65+
this.at = at
66+
}
67+
}
68+
69+
}
70+
71+
private class ClientRatelimits(private val burst: Int, private val duration: Duration) {
72+
73+
private val limiters = ConcurrentHashMap<Long, RatelimitQueue>()
74+
75+
fun getClientRatelimit(clientId: Long): RatelimitQueue = limiters.computeIfAbsent(clientId) {
76+
RatelimitQueue(burst, duration)
77+
}
78+
79+
}
80+
81+
data class RatelimitSlot(
82+
var usedQuota: Int,
83+
val startsAt: Long,
84+
val endsAt: Long,
85+
)
86+
87+
private class RatelimitQueue(private val burst: Int, private val duration: Duration) {
88+
89+
private val queue = LinkedList<RatelimitSlot>()
90+
private val lock = Mutex()
91+
92+
suspend fun reserveQuota(probeOnly: Boolean = false, maxDelay: Long? = null): Pair<Boolean, Long> =
93+
lock.withLock {
94+
val now = System.currentTimeMillis()
95+
96+
// Clean up expired slots
97+
while (queue.isNotEmpty() && now > queue.first.endsAt) {
98+
queue.removeFirst()
99+
}
100+
101+
// Find free slot at the end of the queue
102+
val lastSlot = queue.lastOrNull()
103+
// No slots are used, so ratelimit is immediately available
104+
if (lastSlot == null) {
105+
// No timeout to check if we can immediately grant quota
106+
if (probeOnly) {
107+
return@withLock false to 0
108+
}
109+
queue.add(RatelimitSlot(1, now, now + duration.inWholeMilliseconds))
110+
return@withLock true to 0
111+
}
112+
113+
// Check if slot still has quota available
114+
if (lastSlot.usedQuota < burst) {
115+
val exceedsDelay = maxDelay != null && lastSlot.startsAt > now + maxDelay
116+
if (exceedsDelay || probeOnly) {
117+
return@withLock false to lastSlot.startsAt
118+
}
119+
lastSlot.usedQuota++
120+
return@withLock true to lastSlot.startsAt
121+
}
122+
123+
// Slot is full, create new slot
124+
val exceedsDelay = maxDelay != null && lastSlot.endsAt > now + maxDelay
125+
if (exceedsDelay || probeOnly) {
126+
return@withLock false to lastSlot.endsAt
127+
}
128+
val nextStart = lastSlot.endsAt
129+
val nextEnd = nextStart + duration.inWholeMilliseconds
130+
queue.add(RatelimitSlot(1, nextStart, nextEnd))
131+
return@withLock true to nextStart
132+
}
133+
134+
}

vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt

+12-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import gg.beemo.latte.CommonConfig
44
import gg.beemo.latte.broker.rabbitmq.RabbitConnection
55
import gg.beemo.latte.config.Configurator
66
import gg.beemo.latte.logging.Log
7-
import gg.beemo.latte.logging.log
7+
import io.grpc.Server
8+
import io.grpc.ServerBuilder
89
import kotlinx.coroutines.runBlocking
910
import org.apache.logging.log4j.LogManager
1011

@@ -22,19 +23,26 @@ object Vanilla {
2223
val brokerConnection = RabbitConnection(
2324
rabbitHosts = Config.RABBIT_HOST,
2425
serviceName = CommonConfig.BrokerServices.VANILLA,
25-
instanceId = "0", // There will only ever be one instance of vanilla
26+
instanceId = "0", // There will only ever be one instance of vanilla
2627
useTls = Config.RABBIT_USE_TLS,
2728
username = Config.RABBIT_USERNAME,
2829
password = Config.RABBIT_PASSWORD,
2930
)
3031

31-
log.debug("Initializing Kafka Ratelimit client")
32-
val ratelimitClient = RatelimitClient(brokerConnection)
32+
log.debug("Initializing Broker Ratelimit client")
33+
val ratelimitClient = BrokerRpcRatelimitClient(brokerConnection)
34+
35+
log.debug("Initializing gRPC Ratelimit client")
36+
val grpcServer: Server = ServerBuilder.forPort(Config.GRPC_PORT)
37+
.addService(GrpcRatelimitService())
38+
.build()
39+
.start()
3340

3441
Runtime.getRuntime().addShutdownHook(Thread({
3542
log.info("Destroying everything")
3643
ratelimitClient.destroy()
3744
brokerConnection.destroy()
45+
grpcServer.shutdown().awaitTermination()
3846
LogManager.shutdown(true, true)
3947
}, "Vanilla Shutdown Hook"))
4048

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../../proto/ratelimit.proto

0 commit comments

Comments
 (0)