Skip to content

Commit

Permalink
Merge pull request #1303 from UdashFramework/close-stale-jetty-connec…
Browse files Browse the repository at this point in the history
…tions-on-monix-timeout

close jetty connections on monix timeout (or other cancellation)
  • Loading branch information
ddworak authored Dec 23, 2024
2 parents b8d59a4 + c149256 commit e991b56
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package rest

import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.ee8.servlet.{ServletHandler, ServletHolder}

import scala.concurrent.duration._
import scala.concurrent.duration.*

abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)

def maxPayloadSize: Int = 1024 * 1024
def serverTimeout: FiniteDuration = 10.seconds
Expand Down
39 changes: 28 additions & 11 deletions rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@ package rest

import io.udash.rest.raw.HttpErrorException
import io.udash.rest.raw.RawRest.HandleRequest
import sttp.client3.SttpBackend
import sttp.client3.{HttpClientFutureBackend, SttpBackend}

import scala.concurrent.duration._
import java.net.http.HttpClient
import java.time.Duration as JDuration
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}

trait SttpClientRestTest extends ServletBasedRestApiTest {
implicit val backend: SttpBackend[Future, Any] = SttpRestClient.defaultBackend()
/**
* Similar to the defaultHttpClient, but with a connection timeout
* significantly exceeding the value of the CallTimeout
*/
implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend.usingClient(
HttpClient
.newBuilder()
.connectTimeout(JDuration.ofMillis(IdleTimout.toMillis))
.followRedirects(HttpClient.Redirect.NEVER)
.build()
)

def clientHandle: HandleRequest =
SttpRestClient.asHandleRequest[Future](s"$baseUrl/api")
Expand All @@ -21,22 +33,27 @@ trait SttpClientRestTest extends ServletBasedRestApiTest {
}

class SttpRestCallTest extends SttpClientRestTest with RestApiTestScenarios {
test("too large binary request") {
val future = proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5))
val exception = future.failed.futureValue
assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)"))
"too large binary request" in {
proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5))
.failed
.map { exception =>
assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)"))
}
}
}

class ServletTimeoutTest extends SttpClientRestTest {
override def serverTimeout: FiniteDuration = 500.millis

test("rest method timeout") {
val exception = proxy.neverGet.failed.futureValue
assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds"))
"rest method timeout" in {
proxy.neverGet
.failed
.map { exception =>
assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds"))
}
}

test("subsequent requests with timeout") {
"subsequent requests with timeout" in {
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
Expand Down
105 changes: 54 additions & 51 deletions rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package io.udash
package rest.jetty

import com.avsystem.commons._
import com.avsystem.commons.*
import com.avsystem.commons.annotation.explicitGenerics
import io.udash.rest.raw._
import io.udash.rest.raw.*
import io.udash.utils.URLEncoder
import monix.eval.Task
import org.eclipse.jetty.client.{BufferingResponseListener, BytesRequestContent, HttpClient, Result, StringRequestContent}
import monix.execution.Callback
import org.eclipse.jetty.client.*
import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes}

import java.nio.charset.Charset
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scala.concurrent.CancellationException
import scala.concurrent.duration.*

object JettyRestClient {
final val DefaultMaxResponseLength = 2 * 1024 * 1024
Expand All @@ -31,55 +32,57 @@ object JettyRestClient {
maxResponseLength: Int = DefaultMaxResponseLength,
timeout: Duration = DefaultTimeout
): RawRest.HandleRequest =
request => Task.async { callback =>
val path = baseUrl + PlainValue.encodePath(request.parameters.path)
val httpReq = client.newRequest(baseUrl).method(request.method.name)
request => Task(client.newRequest(baseUrl).method(request.method.name)).flatMap { httpReq =>
Task.async { (callback: Callback[Throwable, RestResponse]) =>
val path = baseUrl + PlainValue.encodePath(request.parameters.path)

httpReq.path(path)
request.parameters.query.entries.foreach {
case (name, PlainValue(value)) => httpReq.param(name, value)
}
request.parameters.headers.entries.foreach {
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
}
request.parameters.cookies.entries.foreach {
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
}

request.body match {
case HttpBody.Empty =>
case tb: HttpBody.Textual =>
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
case bb: HttpBody.Binary =>
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
}
httpReq.path(path)
request.parameters.query.entries.foreach {
case (name, PlainValue(value)) => httpReq.param(name, value)
}
request.parameters.headers.entries.foreach {
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
}
request.parameters.cookies.entries.foreach {
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
}

timeout match {
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
case _ =>
}
request.body match {
case HttpBody.Empty =>
case tb: HttpBody.Textual =>
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
case bb: HttpBody.Binary =>
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
}

httpReq.send(new BufferingResponseListener(maxResponseLength) {
override def onComplete(result: Result): Unit =
if (result.isSucceeded) {
val httpResp = result.getResponse
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
val body = (contentTypeOpt, charsetOpt) match {
case (Opt(contentType), Opt(charset)) =>
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
case (Opt(contentType), Opt.Empty) =>
HttpBody.binary(getContent, contentType)
case _ =>
HttpBody.Empty
}
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
callback(Success(response))
} else {
callback(Failure(result.getFailure))
timeout match {
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
case _ =>
}
})

httpReq.send(new BufferingResponseListener(maxResponseLength) {
override def onComplete(result: Result): Unit =
if (result.isSucceeded) {
val httpResp = result.getResponse
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
val body = (contentTypeOpt, charsetOpt) match {
case (Opt(contentType), Opt(charset)) =>
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
case (Opt(contentType), Opt.Empty) =>
HttpBody.binary(getContent, contentType)
case _ =>
HttpBody.Empty
}
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
callback(Success(response))
} else {
callback(Failure(result.getFailure))
}
})
}
.doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled"))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ import io.udash.rest.{RestApiTestScenarios, ServletBasedRestApiTest}
import org.eclipse.jetty.client.HttpClient

final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios {
val client: HttpClient = new HttpClient
/**
* Similar to the default HttpClient, but with a connection timeout
* significantly exceeding the value of the CallTimeout
*/
val client: HttpClient = new HttpClient() {
setMaxConnectionsPerDestination(MaxConnections)
setIdleTimeout(IdleTimout.toMillis)
}

def clientHandle: HandleRequest =
JettyRestClient.asHandleRequest(client, s"$baseUrl/api", maxPayloadSize)
Expand Down
86 changes: 62 additions & 24 deletions rest/src/test/scala/io/udash/rest/RestApiTest.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,47 @@
package io.udash
package rest

import com.avsystem.commons._
import cats.implicits.catsSyntaxTuple2Semigroupal
import com.avsystem.commons.*
import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps
import io.udash.rest.raw.RawRest
import io.udash.rest.raw.RawRest.HandleRequest
import io.udash.testing.AsyncUdashSharedTest
import monix.eval.Task
import monix.execution.Scheduler
import org.scalactic.source.Position
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{Assertion, BeforeAndAfterEach}

abstract class RestApiTest extends AnyFunSuite with ScalaFutures {
import scala.concurrent.duration.FiniteDuration

abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach {
implicit def scheduler: Scheduler = Scheduler.global

protected final val MaxConnections: Int = 1 // to timeout quickly
protected final val Connections: Int = 10 // > MaxConnections
protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout
protected final val IdleTimout: FiniteDuration = CallTimeout * 100

protected val impl: RestTestApi.Impl = new RestTestApi.Impl

override protected def beforeEach(): Unit = {
super.beforeEach()
impl.resetCounter()
}

final val serverHandle: RawRest.HandleRequest =
RawRest.asHandleRequest[RestTestApi](RestTestApi.Impl)
RawRest.asHandleRequest[RestTestApi](impl)

def clientHandle: RawRest.HandleRequest

lazy val proxy: RestTestApi =
RawRest.fromHandleRequest[RestTestApi](clientHandle)

def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Unit =
assert(
call(proxy).wrapToTry.futureValue.map(mkDeep) ==
call(RestTestApi.Impl).catchFailures.wrapToTry.futureValue.map(mkDeep)
)
def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Future[Assertion] =
(call(proxy).wrapToTry, call(impl).catchFailures.wrapToTry).mapN { (proxyResult, implResult) =>
assert(proxyResult.map(mkDeep) == implResult.map(mkDeep))
}

def mkDeep(value: Any): Any = value match {
case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep)
Expand All @@ -33,62 +50,83 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures {
}

trait RestApiTestScenarios extends RestApiTest {
test("trivial GET") {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(50, Millis)))

"trivial GET" in {
testCall(_.trivialGet)
}

test("failing GET") {
"failing GET" in {
testCall(_.failingGet)
}

test("JSON failing GET") {
"JSON failing GET" in {
testCall(_.jsonFailingGet)
}

test("more failing GET") {
"more failing GET" in {
testCall(_.moreFailingGet)
}

test("complex GET") {
"complex GET" in {
testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt(3), 4, "ó /&f"))
testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt.Empty, 3, "ó /&f"))
}

test("multi-param body POST") {
"multi-param body POST" in {
testCall(_.multiParamPost(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", 3, "l\"l"))
}

test("single body PUT") {
"single body PUT" in {
testCall(_.singleBodyPut(RestEntity(RestEntityId("id"), "señor")))
}

test("form POST") {
"form POST" in {
testCall(_.formPost("ó", "ą=ę", 42))
}

test("prefixed GET") {
"prefixed GET" in {
testCall(_.prefix("p0", "h0", "q0").subget(0, 1, 2))
}

test("transparent prefix GET") {
"transparent prefix GET" in {
testCall(_.transparentPrefix.subget(0, 1, 2))
}

test("custom response with headers") {
"custom response with headers" in {
testCall(_.customResponse("walue"))
}

test("binary request and response") {
"binary request and response" in {
testCall(_.binaryEcho(Array.fill[Byte](5)(5)))
}

test("large binary request and response") {
"large binary request and response" in {
testCall(_.binaryEcho(Array.fill[Byte](1024 * 1024)(5)))
}

test("body using third party type") {
"body using third party type" in {
testCall(_.thirdPartyBody(HasThirdParty(ThirdParty(5))))
}

"close connection on monix task timeout" in {
Task
.traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed)
.map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times
.runToFuture
}

"close connection on monix task cancellation" in {
Task
.traverse(List.range(0, Connections)) { i =>
val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ())
Task.sleep(100.millis)
.restartUntil(_ => impl.counterValue() >= i)
.map(_ => cancelable.cancel())
}
.map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times
.runToFuture
}
}

class DirectRestApiTest extends RestApiTestScenarios {
Expand Down
Loading

0 comments on commit e991b56

Please sign in to comment.