Skip to content

Commit

Permalink
m5l4 - cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
evgnep committed Nov 9, 2023
1 parent d3b3019 commit 5f89cc9
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 1 deletion.
3 changes: 3 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ exposedVersion=0.41.1

# Kafka
kafkaVersion=3.4.0

# Cassandra
cassandraDriverVersion=4.13.0
32 changes: 32 additions & 0 deletions ok-marketplace-repo-cassandra/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
kotlin("jvm")
kotlin("kapt")
}

dependencies {
val coroutinesVersion: String by project
val cassandraDriverVersion: String by project
val testContainersVersion: String by project
val logbackVersion: String by project
val kotlinLoggingJvmVersion: String by project
val kmpUUIDVersion: String by project

implementation(project(":ok-marketplace-common"))

implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$coroutinesVersion")

implementation("com.benasher44:uuid:$kmpUUIDVersion")

implementation("com.datastax.oss:java-driver-core:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-query-builder:$cassandraDriverVersion")
kapt("com.datastax.oss:java-driver-mapper-processor:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-mapper-runtime:$cassandraDriverVersion")

// log
implementation("ch.qos.logback:logback-classic:$logbackVersion")
implementation("io.github.microutils:kotlin-logging-jvm:$kotlinLoggingJvmVersion")

testImplementation(project(":ok-marketplace-repo-tests"))
testImplementation("org.testcontainers:cassandra:$testContainersVersion")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ru.otus.otuskotlin.marketplace.backend.repo.cassandra

import com.datastax.oss.driver.api.mapper.annotations.Dao
import com.datastax.oss.driver.api.mapper.annotations.Delete
import com.datastax.oss.driver.api.mapper.annotations.Insert
import com.datastax.oss.driver.api.mapper.annotations.QueryProvider
import com.datastax.oss.driver.api.mapper.annotations.Select
import com.datastax.oss.driver.api.mapper.annotations.Update
import ru.otus.otuskotlin.marketplace.backend.repo.cassandra.model.AdCassandraDTO
import ru.otus.otuskotlin.marketplace.common.repo.DbAdFilterRequest
import java.util.concurrent.CompletionStage

@Dao
interface AdCassandraDAO {
@Insert
fun create(dto: AdCassandraDTO): CompletionStage<Unit>

@Select
fun read(id: String): CompletionStage<AdCassandraDTO?>

@Update(customIfClause = "lock = :prevLock")
fun update(dto: AdCassandraDTO, prevLock: String): CompletionStage<Boolean>

@Delete(customWhereClause = "id = :id", customIfClause = "lock = :prevLock", entityClass = [AdCassandraDTO::class])
fun delete(id: String, prevLock: String): CompletionStage<Boolean>

@QueryProvider(providerClass = AdCassandraSearchProvider::class, entityHelpers = [AdCassandraDTO::class])
fun search(filter: DbAdFilterRequest): CompletionStage<Collection<AdCassandraDTO>>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ru.otus.otuskotlin.marketplace.backend.repo.cassandra

import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.datastax.oss.driver.api.mapper.MapperContext
import com.datastax.oss.driver.api.mapper.entity.EntityHelper
import com.datastax.oss.driver.api.querybuilder.QueryBuilder
import ru.otus.otuskotlin.marketplace.backend.repo.cassandra.model.AdCassandraDTO
import ru.otus.otuskotlin.marketplace.backend.repo.cassandra.model.toTransport
import ru.otus.otuskotlin.marketplace.common.models.MkplDealSide
import ru.otus.otuskotlin.marketplace.common.models.MkplUserId
import ru.otus.otuskotlin.marketplace.common.repo.DbAdFilterRequest
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer

class AdCassandraSearchProvider(
private val context: MapperContext,
private val entityHelper: EntityHelper<AdCassandraDTO>
) {
fun search(filter: DbAdFilterRequest): CompletionStage<Collection<AdCassandraDTO>> {
var select = entityHelper.selectStart().allowFiltering()

if (filter.titleFilter.isNotBlank()) {
select = select
.whereColumn(AdCassandraDTO.COLUMN_TITLE)
.like(QueryBuilder.literal("%${filter.titleFilter}%"))
}
if (filter.ownerId != MkplUserId.NONE) {
select = select
.whereColumn(AdCassandraDTO.COLUMN_OWNER_ID)
.isEqualTo(QueryBuilder.literal(filter.ownerId.asString(), context.session.context.codecRegistry))
}
if (filter.dealSide != MkplDealSide.NONE) {
select = select
.whereColumn(AdCassandraDTO.COLUMN_AD_TYPE)
.isEqualTo(QueryBuilder.literal(filter.dealSide.toTransport(), context.session.context.codecRegistry))
}

val asyncFetcher = AsyncFetcher()

context.session
.executeAsync(select.build())
.whenComplete(asyncFetcher)

return asyncFetcher.stage
}

inner class AsyncFetcher : BiConsumer<AsyncResultSet?, Throwable?> {
private val buffer = mutableListOf<AdCassandraDTO>()
private val future = CompletableFuture<Collection<AdCassandraDTO>>()
val stage: CompletionStage<Collection<AdCassandraDTO>> = future

override fun accept(resultSet: AsyncResultSet?, t: Throwable?) {
when {
t != null -> future.completeExceptionally(t)
resultSet == null -> future.completeExceptionally(IllegalStateException("ResultSet should not be null"))
else -> {
buffer.addAll(resultSet.currentPage().map { entityHelper.get(it, false) })
if (resultSet.hasMorePages())
resultSet.fetchNextPage().whenComplete(this)
else
future.complete(buffer)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ru.otus.otuskotlin.marketplace.backend.repo.cassandra

import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.mapper.annotations.DaoFactory
import com.datastax.oss.driver.api.mapper.annotations.DaoKeyspace
import com.datastax.oss.driver.api.mapper.annotations.DaoTable
import com.datastax.oss.driver.api.mapper.annotations.Mapper

@Mapper
interface CassandraMapper {
@DaoFactory
fun adDao(@DaoKeyspace keyspace: String, @DaoTable tableName: String): AdCassandraDAO

companion object {
fun builder(session: CqlSession) = CassandraMapperBuilder(session)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ru.otus.otuskotlin.marketplace.backend.repo.cassandra

import com.datastax.oss.driver.api.core.cql.Statement
import com.datastax.oss.driver.api.core.type.reflect.GenericType
import com.datastax.oss.driver.api.mapper.MapperContext
import com.datastax.oss.driver.api.mapper.entity.EntityHelper
import com.datastax.oss.driver.api.mapper.result.MapperResultProducer
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage

class CompletionStageOfUnitProducer : MapperResultProducer {
companion object {
val PRODUCED_TYPE = object : GenericType<CompletionStage<Unit>>() {}
}

override fun canProduce(resultType: GenericType<*>): Boolean =
resultType == PRODUCED_TYPE

override fun execute(
statement: Statement<*>,
context: MapperContext,
entityHelper: EntityHelper<*>?
): CompletionStage<Unit> {
val result = CompletableFuture<Unit>()
context.session.executeAsync(statement).whenComplete { _, e ->
if (e != null) result.completeExceptionally(e)
else result.complete(Unit)
}
return result
}

override fun wrapError(e: Exception): CompletionStage<Unit> =
CompletableFutures.failedFuture(e)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.otus.otuskotlin.marketplace.backend.repo.cassandra

import com.datastax.oss.driver.api.mapper.result.MapperResultProducer
import com.datastax.oss.driver.api.mapper.result.MapperResultProducerService

class KotlinProducerService : MapperResultProducerService {
override fun getProducers(): MutableIterable<MapperResultProducer> =
mutableListOf(CompletionStageOfUnitProducer())
}
Loading

0 comments on commit 5f89cc9

Please sign in to comment.