Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(providers): handle additional error cases in Http/WsClients #179

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 77 additions & 54 deletions ethers-providers/src/main/kotlin/io/ethers/providers/HttpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Response
import java.io.BufferedInputStream
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.InputStream
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Function
Expand Down Expand Up @@ -89,14 +89,36 @@ class HttpClient(
LOG.trc {
// reading from response body consumes it, so we need to create a new one
val arr = stream.readAllBytes()
stream = BufferedInputStream(ByteArrayInputStream(arr))
"Response: ${String(arr)}"
stream = ByteArrayInputStream(arr)
"Batch response: ${String(arr)}"
}

if (!it.isSuccessful) {
// complete all requests and the batch future
val bytes = stream.readAllBytes()

// first, try to decode a JSON response
try {
// TODO per the specification, json-rpc batch responses can be returned in any order
ByteArrayInputStream(bytes).useJsonParser {
var index = 0
val parser = this

parser.forEachArrayElement {
val result = parser.decodeNextResult(batch.requests[index].resultDecoder)
batch.responses[index].complete(result)
index++
}
}

ret.complete(true)
return
} catch (ignored: Exception) {
}

// second, if decoding fails, return the response as a message and complete all requests
// including the batch future
val message = "HTTP ${it.code}: ${it.message}"
val data = Jackson.MAPPER.valueToTree<JsonNode>(String(stream.readAllBytes()))
val data = Jackson.MAPPER.valueToTree<JsonNode>(String(bytes))
val error = RpcError(RpcError.CODE_CALL_FAILED, message, data)
val failure = failure(error)

Expand All @@ -111,32 +133,13 @@ class HttpClient(
}

// TODO per the specification, json-rpc batch responses can be returned in any order
Jackson.MAPPER.createAndInitParser(stream).use { p ->
stream.useJsonParser {
var index = 0
p.forEachArrayElement {
var result: Result<*, RpcError>? = null

p.forEachObjectField { field ->
when (field) {
"id" -> {}
"jsonrpc" -> {}
"result" -> {
result = success(batch.requests[index].resultDecoder.apply(p))
}

"error" -> {
result = failure(Jackson.MAPPER.readValue(p, RpcError::class.java))
}
}
}

if (result == null) {
batch.responses[index].complete(ERROR_INVALID_RESPONSE)
return
}
val parser = this

parser.forEachArrayElement {
val result = parser.decodeNextResult(batch.requests[index].resultDecoder)
batch.responses[index].complete(result)

index++
}
}
Expand Down Expand Up @@ -191,40 +194,31 @@ class HttpClient(
LOG.trc {
// reading from response body consumes it, so we need to create a new one
val arr = stream.readAllBytes()
stream = BufferedInputStream(ByteArrayInputStream(arr))
stream = ByteArrayInputStream(arr)
"Response: ${String(arr)}"
}

if (!it.isSuccessful) {
val bytes = stream.readAllBytes()

// first, try to decode a JSON response
try {
ret.complete(ByteArrayInputStream(bytes).decodeResult(resultDecoder))
return
} catch (ignored: Exception) {
}

// second, if decoding fails, return the response as a message
val message = "HTTP ${it.code}: ${it.message}"
val data = Jackson.MAPPER.valueToTree<JsonNode>(String(stream.readAllBytes()))
val data = Jackson.MAPPER.valueToTree<JsonNode>(String(bytes))
val error = RpcError(RpcError.CODE_CALL_FAILED, message, data)
LOG.err { "Call failed for method=$method, params=${params.contentToString()}: $error" }

ret.complete(failure(error))
return
}

Jackson.MAPPER.createAndInitParser(stream).use { p ->
var result: Result<T, RpcError>? = null
p.forEachObjectField { field ->
when (field) {
"id" -> {}
"jsonrpc" -> {}
"result" -> result = success(resultDecoder.apply(p))
"error" -> {
result = failure(Jackson.MAPPER.readValue(p, RpcError::class.java))
}
}
}

if (result == null) {
ret.complete(ERROR_INVALID_RESPONSE)
return
}

ret.complete(result)
}
ret.complete(stream.decodeResult(resultDecoder))
} catch (e: Exception) {
LOG.err(e) { "Error processing response for method=$method, params=${params.contentToString()}" }

Expand All @@ -238,6 +232,28 @@ class HttpClient(
return ret
}

private fun <T> InputStream.decodeResult(decoder: Function<JsonParser, T>): Result<T, RpcError> {
return useJsonParser { decodeNextResult(decoder) }
}

private fun <T> JsonParser.decodeNextResult(decoder: Function<JsonParser, T>): Result<T, RpcError> {
var result: Result<T, RpcError>? = null
this.forEachObjectField { field ->
when (field) {
"id" -> {}
"jsonrpc" -> {}
"result" -> result = success(decoder.apply(this))
"error" -> result = failure(Jackson.MAPPER.readValue(this, RpcError::class.java))
}
}

return result ?: ERROR_INVALID_RESPONSE
}

private inline fun <R> InputStream.useJsonParser(action: JsonParser.() -> R): R {
return Jackson.MAPPER.createAndInitParser(this).use(action)
}

override fun <T> subscribe(
params: Array<*>,
resultDecoder: Function<JsonParser, T>,
Expand All @@ -246,7 +262,7 @@ class HttpClient(
}

private fun BatchRpcRequest.toRequestBody(): RequestBody {
val output = DirectByteArrayOutputStream()
val output = DirectByteArrayOutputStream(requests.size * BYTE_BUFFER_DEFAULT_SIZE)
output.use { out ->
val gen = Jackson.MAPPER.createGenerator(out)

Expand All @@ -265,7 +281,7 @@ class HttpClient(
}

private fun createJsonRpcRequestBody(method: String, params: Array<*>): RequestBody {
val output = DirectByteArrayOutputStream()
val output = DirectByteArrayOutputStream(BYTE_BUFFER_DEFAULT_SIZE)

output.use { out ->
val gen = Jackson.MAPPER.createGenerator(out)
Expand All @@ -276,13 +292,20 @@ class HttpClient(
return output.internalBuffer.toRequestBody(JSON_MEDIA_TYPE, byteCount = output.size())
}

private class DirectByteArrayOutputStream : ByteArrayOutputStream() {
// contains trailing zeros since it's unlikely that the buffer will be exactly the right size
private class DirectByteArrayOutputStream(size: Int) : ByteArrayOutputStream(size) {
/**
* Return the internal buffer.
*
* **NOTE**: Contains trailing zeros since it's unlikely that the buffer will be exactly the right size.
* */
val internalBuffer: ByteArray
get() = buf
}

companion object {
// one of the smallest possible requests, `eth_chainID`, takes 59 bytes. Most requests use more,
// around 100 bytes, so we use a buffer of 128 bytes to try and avoid reallocations in most cases
private const val BYTE_BUFFER_DEFAULT_SIZE = 128
private val JSON_MEDIA_TYPE = "application/json".toMediaType()

private val ERROR_SUBSCRIPTION_UNSUPPORTED = failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ class WsClient(
// messages are terminated by a new line. Remove it when logging to get nicer output
LOG.trc { "Processing message: ${msg?.removeSuffix(System.lineSeparator())}" }

handleMessage(msg!!)
try {
handleMessage(msg!!)
} catch (e: Exception) {
LOG.err(e) { "Error processing message, skipping: $msg" }
}
}

// second, check if we need to reconnect, initiating re-subscription for existing subs
Expand Down Expand Up @@ -206,6 +210,12 @@ class WsClient(
TimeUnit.MILLISECONDS,
)
}

if (!reconnectSuccessful) {
handleTimeouts(client.readTimeoutMillis.toLong().milliseconds)

Thread.sleep(2000L)
}
}

if (stopping) {
Expand Down Expand Up @@ -322,11 +332,7 @@ class WsClient(

// check and handle timed-out requests every 1000ms
if (lastTimeoutCheck.elapsedNow() > 1000.milliseconds) {
val timeout = client.readTimeoutMillis.toLong().milliseconds
removeTimedOutRequests(inFlightRequests, timeout)
removeTimedOutRequests(inFlightBatchRequests, timeout)
removeTimedOutRequests(inFlightSubscriptionRequests, timeout)

handleTimeouts(client.readTimeoutMillis.toLong().milliseconds)
lastTimeoutCheck = TimeSource.Monotonic.markNow()
}

Expand All @@ -352,6 +358,12 @@ class WsClient(
processorThread.start()
}

private fun handleTimeouts(timeout: Duration) {
removeTimedOutRequests(inFlightRequests, timeout)
removeTimedOutRequests(inFlightBatchRequests, timeout)
removeTimedOutRequests(inFlightSubscriptionRequests, timeout)
}

private fun <T : ExpiringRequest> removeTimedOutRequests(requests: MutableMap<Long, T>, timeout: Duration) {
// skip expiration if timeout is not set or requests is empty
if (!timeout.isPositive() || requests.isEmpty()) {
Expand Down