Skip to content

Commit 47c1927

Browse files
committed
Initial GRPC implementation
1 parent 88f1fa2 commit 47c1927

File tree

8 files changed

+164
-39
lines changed

8 files changed

+164
-39
lines changed

latte/src/main/java/gg/beemo/latte/util/SuspendingRatelimit.kt renamed to latte/src/main/java/gg/beemo/latte/util/Ratelimit.kt

+30-26
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,44 @@ import kotlinx.coroutines.sync.Semaphore
55
import kotlinx.coroutines.sync.withPermit
66
import kotlin.time.Duration
77

8-
class SuspendingRatelimit(private val burst: Int, private val duration: Duration) {
8+
class Ratelimit(val burst: Int, val duration: Duration) {
99
@Volatile
1010
private var remainingQuota: Int = burst
11+
val remaining: Int get() = remainingQuota
1112

1213
@Volatile
1314
private var resetTimestamp: Long = 0
15+
val resetAt: Long get() = resetTimestamp
1416

1517
private val quotaRequestSem = Semaphore(1)
1618

19+
suspend fun requestQuota() {
20+
quotaRequestSem.withPermit {
21+
if (remainingQuota <= 0) {
22+
tryResetQuota()
23+
val waitTime = calculateWaitTime()
24+
delay(waitTime)
25+
}
26+
tryResetQuota()
27+
check(remainingQuota > 0)
28+
remainingQuota--
29+
}
30+
}
31+
32+
fun tryRequestQuota(): Pair<Boolean, Long> {
33+
tryResetQuota()
34+
if (remainingQuota <= 0) {
35+
return false to calculateWaitTime()
36+
}
37+
check(remainingQuota > 0)
38+
remainingQuota--
39+
return true to calculateWaitTime()
40+
}
41+
42+
fun addQuota(amount: Int) {
43+
remainingQuota += amount
44+
}
45+
1746
fun overrideRatelimit(
1847
remainingQuota: Int,
1948
resetTimestamp: Long,
@@ -32,29 +61,4 @@ class SuspendingRatelimit(private val burst: Int, private val duration: Duration
3261
resetTimestamp = System.currentTimeMillis() + duration.inWholeMilliseconds
3362
}
3463
}
35-
36-
suspend fun requestQuota() {
37-
quotaRequestSem.withPermit {
38-
if (remainingQuota <= 0) {
39-
val waitTime = calculateWaitTime()
40-
delay(waitTime)
41-
}
42-
tryResetQuota()
43-
44-
check(remainingQuota > 0)
45-
remainingQuota--
46-
}
47-
}
48-
49-
fun tryRequestQuota(): Pair<Boolean, Long?> {
50-
if (remainingQuota <= 0) {
51-
val waitTime = calculateWaitTime()
52-
return false to waitTime
53-
}
54-
tryResetQuota()
55-
56-
check(remainingQuota > 0)
57-
remainingQuota--
58-
return true to null
59-
}
6064
}

proto/ratelimit.proto

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
syntax = "proto3";
2+
3+
option java_multiple_files = true;
4+
option java_package = "gg.beemo.vanilla";
5+
option java_outer_classname = "RatelimitProto";
6+
7+
package ratelimit;
8+
9+
service Ratelimit {
10+
rpc RequestQuota (RatelimitRequest) returns (RatelimitResponse) {}
11+
}
12+
13+
message RatelimitRequest {
14+
RatelimitType type = 1;
15+
fixed64 clientId = 2;
16+
}
17+
18+
message RatelimitResponse {
19+
bool granted = 1;
20+
uint32 limit = 2;
21+
uint32 remaining = 3;
22+
uint64 reset = 4;
23+
uint32 resetAfter = 5;
24+
}
25+
26+
enum RatelimitType {
27+
GLOBAL = 0;
28+
IDENTIFY = 1;
29+
}

vanilla/build.gradle.kts

+36-3
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,60 @@
11
plugins {
22
application
33
java
4-
kotlin("jvm") version "1.9.20"
4+
kotlin("jvm") version "2.0.21"
5+
id("com.google.protobuf") version "0.9.4"
56
}
67

78
group = "gg.beemo.vanilla"
89
version = "1.0.0"
910

1011
dependencies {
1112
// Kotlin
12-
val kotlinCoroutinesVersion = "1.7.3"
13+
val kotlinCoroutinesVersion = "1.9.0"
1314
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
1415

1516
// Beemo shared code
1617
implementation("gg.beemo.latte:latte")
1718

19+
// gRPC
20+
val grpcVersion = "1.68.0"
21+
implementation("io.grpc:grpc-netty-shaded:$grpcVersion")
22+
implementation("io.grpc:grpc-protobuf:$grpcVersion")
23+
implementation("io.grpc:grpc-kotlin-stub:1.4.1")
24+
implementation("com.google.protobuf:protobuf-kotlin:4.28.2")
25+
1826
// Logging
19-
val log4jVersion = "2.22.0"
27+
val log4jVersion = "2.24.1"
2028
implementation("org.apache.logging.log4j:log4j-api:$log4jVersion")
2129
implementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
2230
implementation("org.apache.logging.log4j:log4j-slf4j2-impl:$log4jVersion")
2331
}
2432

33+
protobuf {
34+
protoc {
35+
artifact = "com.google.protobuf:protoc:4.28.2"
36+
}
37+
plugins {
38+
create("grpc") {
39+
artifact = "io.grpc:protoc-gen-grpc-java:1.68.0"
40+
}
41+
create("grpckt") {
42+
artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar"
43+
}
44+
}
45+
generateProtoTasks {
46+
all().forEach {
47+
it.plugins {
48+
create("grpc")
49+
create("grpckt")
50+
}
51+
it.builtins {
52+
create("kotlin")
53+
}
54+
}
55+
}
56+
}
57+
2558
repositories {
2659
mavenCentral()
2760
}

vanilla/src/main/java/gg/beemo/vanilla/Config.java

+2
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ public class Config {
1010

1111
public static String RABBIT_PASSWORD = "guest";
1212

13+
public static int GRPC_PORT = 1337;
14+
1315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package gg.beemo.vanilla
2+
3+
import gg.beemo.latte.logging.Log
4+
import gg.beemo.latte.util.Ratelimit
5+
import java.util.concurrent.ConcurrentHashMap
6+
import kotlin.time.Duration
7+
import kotlin.time.Duration.Companion.seconds
8+
9+
// TODO https://github.com/grpc/grpc-kotlin/blob/master/examples/server/src/main/kotlin/io/grpc/examples/animals/AnimalsServer.kt
10+
11+
class GrpcRatelimitService : RatelimitGrpcKt.RatelimitCoroutineImplBase() {
12+
13+
private val log by Log
14+
private val globalRatelimits = RatelimitMap(50, 1.seconds)
15+
private val identifyRatelimits = RatelimitMap(1, 5.seconds)
16+
17+
override suspend fun requestQuota(request: RatelimitRequest): RatelimitResponse {
18+
val (ratelimitMap, typeString) = when (request.type) {
19+
RatelimitType.GLOBAL -> globalRatelimits to "global"
20+
RatelimitType.IDENTIFY -> identifyRatelimits to "identify"
21+
else -> throw IllegalArgumentException("Unknown ratelimit type ${request.type}")
22+
}
23+
val ratelimit = ratelimitMap.getClientRatelimit(request.clientId)
24+
// TODO Do we want to make a blocking version?
25+
val (granted, resetAfter) = ratelimit.tryRequestQuota()
26+
log.debug("Got '{}' quota request from clientId {}, was tranted: {}", typeString, request.clientId, granted)
27+
28+
return ratelimitResponse {
29+
this.granted = granted
30+
this.limit = ratelimit.burst
31+
this.remaining = ratelimit.remaining
32+
this.reset = ratelimit.resetAt
33+
this.resetAfter = resetAfter.toInt()
34+
}
35+
}
36+
37+
}
38+
39+
private class RatelimitMap(private val burst: Int, private val duration: Duration) {
40+
41+
private val limiters = ConcurrentHashMap<Long, Ratelimit>()
42+
43+
fun getClientRatelimit(clientId: Long): Ratelimit = limiters.computeIfAbsent(clientId) {
44+
Ratelimit(burst, duration)
45+
}
46+
47+
}

vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@ import gg.beemo.latte.broker.IgnoreRpcRequest
66
import gg.beemo.latte.broker.rpc.RpcStatus
77
import gg.beemo.latte.logging.Log
88
import gg.beemo.latte.ratelimit.SharedRatelimitData
9-
import gg.beemo.latte.util.SuspendingRatelimit
9+
import gg.beemo.latte.util.Ratelimit
1010
import java.util.concurrent.ConcurrentHashMap
1111
import kotlin.time.Duration
1212
import kotlin.time.Duration.Companion.seconds
1313

1414
// Give request expiry a bit of leeway in case of clock drift
1515
private val EXPIRY_GRACE_PERIOD = 5.seconds.inWholeMilliseconds
1616

17-
class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
17+
class KafkaRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
1818

1919
private val log by Log
20-
private val globalRatelimitProvider = RatelimitProvider(50, 1.seconds)
21-
private val identifyRatelimitProvider = RatelimitProvider(1, 5.seconds)
20+
private val globalRatelimitProvider = KafkaRatelimitProvider(50, 1.seconds)
21+
private val identifyRatelimitProvider = KafkaRatelimitProvider(1, 5.seconds)
2222

2323
init {
2424
rpc<SharedRatelimitData.RatelimitRequestData, Unit>(
@@ -54,12 +54,12 @@ class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
5454

5555
}
5656

57-
private class RatelimitProvider(private val burst: Int, private val duration: Duration) {
57+
private class KafkaRatelimitProvider(private val burst: Int, private val duration: Duration) {
5858

59-
private val limiters = ConcurrentHashMap<String, SuspendingRatelimit>()
59+
private val limiters = ConcurrentHashMap<String, Ratelimit>()
6060

61-
fun getClientRatelimit(clientId: String): SuspendingRatelimit = limiters.computeIfAbsent(clientId) {
62-
SuspendingRatelimit(burst, duration)
61+
fun getClientRatelimit(clientId: String): Ratelimit = limiters.computeIfAbsent(clientId) {
62+
Ratelimit(burst, duration)
6363
}
6464

6565
}

vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import gg.beemo.latte.broker.rabbitmq.RabbitConnection
55
import gg.beemo.latte.config.Configurator
66
import gg.beemo.latte.logging.Log
77
import gg.beemo.latte.logging.log
8+
import io.grpc.Server
9+
import io.grpc.ServerBuilder
810
import kotlinx.coroutines.runBlocking
911
import org.apache.logging.log4j.LogManager
1012

@@ -22,19 +24,26 @@ object Vanilla {
2224
val brokerConnection = RabbitConnection(
2325
rabbitHosts = Config.RABBIT_HOST,
2426
serviceName = CommonConfig.BrokerServices.VANILLA,
25-
instanceId = "0", // There will only ever be one instance of vanilla
27+
instanceId = "0", // There will only ever be one instance of vanilla
2628
useTls = Config.RABBIT_USE_TLS,
2729
username = Config.RABBIT_USERNAME,
2830
password = Config.RABBIT_PASSWORD,
2931
)
3032

3133
log.debug("Initializing Kafka Ratelimit client")
32-
val ratelimitClient = RatelimitClient(brokerConnection)
34+
val ratelimitClient = KafkaRatelimitClient(brokerConnection)
35+
36+
val grpcServer: Server = ServerBuilder.forPort(Config.GRPC_PORT)
37+
.addService(GrpcRatelimitService())
38+
.build()
39+
.start()
3340

3441
Runtime.getRuntime().addShutdownHook(Thread({
3542
log.info("Destroying everything")
3643
ratelimitClient.destroy()
3744
brokerConnection.destroy()
45+
grpcServer.shutdown()
46+
grpcServer.awaitTermination()
3847
LogManager.shutdown(true, true)
3948
}, "Vanilla Shutdown Hook"))
4049

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../../proto/ratelimit.proto

0 commit comments

Comments
 (0)