Skip to content

Commit

Permalink
fix: add search topics
Browse files Browse the repository at this point in the history
  • Loading branch information
storytellerF committed Nov 17, 2024
1 parent 8140d4d commit 2a71ac4
Show file tree
Hide file tree
Showing 66 changed files with 1,565 additions and 1,239 deletions.
8 changes: 4 additions & 4 deletions backend/src/main/kotlin/com/perraco/utils/SnowflakeFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ object SnowflakeFactory {

// Construct the ID.

return (lastTimestampMs.toULong() shl (MACHINE_ID_BITS + SEQUENCE_BITS)) or
(machineId!!.toULong() shl SEQUENCE_BITS) or
sequence.toULong()
return (lastTimestampMs.toLong() shl (MACHINE_ID_BITS + SEQUENCE_BITS)) or
(machineId!!.toLong() shl SEQUENCE_BITS) or
sequence.toLong()
}

/**
Expand All @@ -130,7 +130,7 @@ object SnowflakeFactory {
*/
fun parse(id: PrimaryKey): SnowflakeData {
// Extract the machine ID segment.
val machineIdSegment = (id shr SEQUENCE_BITS) and MAX_MACHINE_ID.toULong()
val machineIdSegment = (id shr SEQUENCE_BITS) and MAX_MACHINE_ID.toLong()

// Extract the timestamp segment.
val timestampMs: Long = (id shr (MACHINE_ID_BITS + SEQUENCE_BITS)).toLong()
Expand Down
4 changes: 3 additions & 1 deletion backend/src/main/kotlin/com/storyteller_f/BaseTable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.kotlin.datetime.datetime

abstract class BaseTable : Table() {
val id = ulong("id")
val id = customPrimaryKey("id")
val createdTime = datetime("created_time").index()

override val primaryKey = PrimaryKey(id)
}

abstract class BaseObj(val id: PrimaryKey, val createdTime: LocalDateTime)

fun Table.customPrimaryKey(name: String) = long(name)
46 changes: 32 additions & 14 deletions backend/src/main/kotlin/com/storyteller_f/DatabaseFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.storyteller_f
import com.storyteller_f.tables.*
import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.statements.InsertStatement
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.sql.transactions.transaction

Expand Down Expand Up @@ -55,65 +56,82 @@ object DatabaseFactory {
}
}

suspend fun <T> dbQuery(block: suspend () -> T): T =
newSuspendedTransaction(Dispatchers.IO) { block() }
suspend fun <T> dbQuery(block: suspend () -> T): Result<T> =
runCatching {
newSuspendedTransaction(Dispatchers.IO) { block() }
}

/**
* 带有transform
*/
suspend fun <T, R> query(transform: T.() -> R, block: suspend () -> T): R =
suspend fun <T, R> query(transform: T.() -> R, block: suspend () -> T): Result<R> =
dbQuery { transform(block()) }

/**
* 处理可能查询不到数据的问题
*/
suspend fun <T, R : Any?> queryNotNull(transform: T.() -> R?, block: suspend () -> T?): R? =
suspend fun <T, R : Any?> queryNotNull(transform: T.() -> R?, block: suspend () -> T?): Result<R?> =
query({
this?.let { transform(it) }
}) { block() }

/**
* 处理可能查询不到数据的问题
*/
suspend fun <T, R : Any?, R1> queryNotNull(
transform: R1.() -> R?,
resultRowTransform: (T) -> R1,
block: suspend () -> T?
): Result<R?> = queryNotNull({
resultRowTransform(this).transform()
}) { block() }

/**
* 带有transform
*/
suspend fun <T, R> mapQuery(transform: T.() -> R, block: suspend () -> SizedIterable<T>): List<R> =
suspend fun <T, R> mapQuery(transform: T.() -> R, block: suspend () -> SizedIterable<T>): Result<List<R>> =
dbQuery { block().map(transform) }

/**
* 带有transform
*/
suspend fun <T, R, R1> mapQuery(
transform: R1.() -> R,
typeTransform: (T) -> R1,
resultRowTransform: (T) -> R1,
block: suspend () -> SizedIterable<T>
): List<R> =
): Result<List<R>> =
dbQuery {
block().map(typeTransform).map(transform)
block().map(resultRowTransform).map(transform)
}

/**
* 查询第一个符合条件的数据
*
* @param transform 转换数据
* @param typeTransform 主要用于将ResultRow 转换成普通数据
* @param resultRowTransform 主要用于将ResultRow 转换成普通数据
*/
suspend fun <T, R, R1> first(
transform: R1.() -> T,
typeTransform: (R) -> R1,
resultRowTransform: (R) -> R1,
block: suspend () -> SizedIterable<R>
): T? = dbQuery {
block().limit(1).firstOrNull()?.let(typeTransform)?.let { transform(it) }
): Result<T?> = dbQuery {
block().limit(1).firstOrNull()?.let(resultRowTransform)?.let { transform(it) }
}

/**
* 检查数据是不是空
*/
suspend fun <T> empty(block: suspend () -> SizedIterable<T>): Boolean = dbQuery {
suspend fun <T> isEmpty(block: suspend () -> SizedIterable<T>): Result<Boolean> = dbQuery {
block().limit(1).empty()
}

suspend fun <T> count(block: suspend () -> SizedIterable<T>): Long = dbQuery {
suspend fun <T> count(block: suspend () -> SizedIterable<T>): Result<Long> = dbQuery {
block().count()
}

suspend fun insert(block: suspend () -> InsertStatement<Number>): Result<Int> = dbQuery {
block().insertedCount
}
}

const val PUBLIC_KEY_LENGTH = 512
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.storyteller_f.index

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient
import co.elastic.clients.elasticsearch._types.SortOrder
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery
import co.elastic.clients.elasticsearch._types.query_dsl.Operator
import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery
import co.elastic.clients.elasticsearch.core.SearchRequest
import co.elastic.clients.json.JsonData
import co.elastic.clients.json.jackson.JacksonJsonpMapper
import co.elastic.clients.transport.TransportUtils
import co.elastic.clients.transport.rest_client.RestClientTransport
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import com.storyteller_f.ElasticConnection
import com.storyteller_f.shared.type.PrimaryKey
import com.storyteller_f.types.PaginationResult
import io.github.aakira.napier.Napier
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.future.await
Expand All @@ -20,8 +28,8 @@ import java.io.File
import java.io.FileInputStream

class ElasticTopicDocumentService(private val connection: ElasticConnection) : TopicDocumentService {
override suspend fun saveDocument(topics: List<TopicDocument>) {
useElasticClient(connection) {
override suspend fun saveDocument(topics: List<TopicDocument>): Result<Unit> {
return useElasticClient(connection) {
topics.map { topic ->
index {
it.index("topics").id(topic.id.toString()).document(topic)
Expand All @@ -32,32 +40,75 @@ class ElasticTopicDocumentService(private val connection: ElasticConnection) : T
}
}

override suspend fun getDocument(idList: List<PrimaryKey>): List<TopicDocument?> {
override suspend fun getDocument(idList: List<PrimaryKey>): Result<List<TopicDocument?>> {
return useElasticClient(connection) {
idList.map { id ->
get({
it.index("topics")
.id(id.toString())
}, TopicDocument::class.java)
}.map {
it.await().source()
it.await()
}.map {
it.source()
}
}
}

override suspend fun clean() {
useElasticClient(connection) {
override suspend fun clean(): Result<Unit> {
return useElasticClient(connection) {
indices().delete {
it.index("topics")
}
}.await()
Unit
}
}

override suspend fun searchDocument(
word: List<String>,
size: Int,
nextTopicId: PrimaryKey?
): Result<PaginationResult<TopicDocument>> {
val contentQuery = MatchQuery.of { m ->
m.field("content")
.query(word.joinToString(" ")) // 多关键字匹配,忽略大小写
.operator(Operator.Or)
}._toQuery()

val idRangeQuery = RangeQuery.of { r ->
r.field("id")
.lt(JsonData.of(nextTopicId ?: Long.MAX_VALUE))
}._toQuery()

val boolQuery = BoolQuery.of { b ->
b.must(contentQuery).must(idRangeQuery)
}

// 构建排序条件:按 ID 升序排序
val request = SearchRequest.of { s ->
s.index("topics") // 指定索引名称
.query(boolQuery._toQuery())
.sort { sort ->
sort.field { f ->
f.field("id").order(SortOrder.Asc)
}
}.trackScores(true)
}
return useElasticClient(connection) {
val response = search<TopicDocument>(request, TopicDocument::class.java).await()
val hits = response.hits()
val total = hits.total()
PaginationResult(hits.hits().mapNotNull {
it.source()
}, total?.value() ?: 0)
}
}
}

private suspend fun <T> useElasticClient(
elasticConnection: ElasticConnection,
block: suspend ElasticsearchAsyncClient.() -> T
): T {
): Result<T> {
val crtStream = withContext(Dispatchers.IO) {
Napier.i(message = "cert path ${File(elasticConnection.certFile).canonicalPath}")
FileInputStream(elasticConnection.certFile)
Expand All @@ -70,20 +121,22 @@ private suspend fun <T> useElasticClient(
AuthScope.ANY,
UsernamePasswordCredentials(elasticConnection.name, elasticConnection.pass)
)
return RestClient
.builder(HttpHost.create(elasticConnection.url))
.setHttpClientConfigCallback { p0 ->
p0.setSSLContext(sslContext)
p0.setDefaultCredentialsProvider(credsProv)
}
.build().use { restClient ->
RestClientTransport(
restClient,
JacksonJsonpMapper().apply {
objectMapper().registerKotlinModule()
return runCatching {
RestClient
.builder(HttpHost.create(elasticConnection.url))
.setHttpClientConfigCallback { p0 ->
p0.setSSLContext(sslContext)
p0.setDefaultCredentialsProvider(credsProv)
}
.build().use { restClient ->
RestClientTransport(
restClient,
JacksonJsonpMapper().apply {
objectMapper().registerKotlinModule()
}
).use { transport ->
ElasticsearchAsyncClient(transport).block()
}
).use { transport ->
ElasticsearchAsyncClient(transport).block()
}
}
}
}
Loading

0 comments on commit 2a71ac4

Please sign in to comment.