Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ExecutionContexts.sameThreadExecutionContext with parasitic #390

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging {
// has completed.
val completed = pool.shutdown()(context.dispatcher)
shutdownCompletedPromise.tryCompleteWith(
completed.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext))
completed.map(_ => Done)(ExecutionContexts.parasitic))
statusById += poolId -> PoolInterfaceShuttingDown(shutdownCompletedPromise)
case Some(PoolInterfaceShuttingDown(formerPromise)) =>
// Pool is already shutting down, mirror the existing promise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@
OptionVal.Some(Event.onPreConnect)
}

/** Run a loop of state transitions */

Check warning on line 377 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala

View workflow job for this annotation

GitHub Actions / validate-links

discarding unmoored doc comment
/* @tailrec (does not work for some reason?) */
def loop[U](event: Event[U], arg: U, remainingIterations: Int): Unit =
if (remainingIterations > 0)
Expand Down Expand Up @@ -469,7 +469,7 @@
entityComplete.onComplete(safely {
case Success(_) => withSlot(_.onRequestEntityCompleted())
case Failure(cause) => withSlot(_.onRequestEntityFailed(cause))
})(ExecutionContexts.sameThreadExecutionContext)
})(ExecutionContexts.parasitic)
request.withEntity(newEntity)
}

Expand Down Expand Up @@ -524,9 +524,9 @@
ongoingResponseEntity = None
ongoingResponseEntityKillSwitch = None
}
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
case Failure(_) => throw new IllegalStateException("Should never fail")
})(ExecutionContexts.sameThreadExecutionContext)
})(ExecutionContexts.parasitic)

withSlot(_.onResponseReceived(response.withEntity(newEntity)))
}
Expand Down Expand Up @@ -609,7 +609,7 @@
onConnectionAttemptFailed(currentEmbargoLevel)
sl.onConnectionAttemptFailed(cause)
}
})(ExecutionContexts.sameThreadExecutionContext)
})(ExecutionContexts.parasitic)

slotCon
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) =>
Done
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private[http] object HttpServerBluePrint {
log.error(ex,
s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
case _ => // ignore
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
newEntity
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[http] object StreamUtils {
killResult.future.value match {
case Some(res) => handleKill(res)
case None =>
killResult.future.onComplete(killCallback.invoke)(ExecutionContexts.sameThreadExecutionContext)
killResult.future.onComplete(killCallback.invoke)(ExecutionContexts.parasitic)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
}

class Http(system: ExtendedActorSystem) extends pekko.actor.Extension {
import pekko.dispatch.ExecutionContexts.{ sameThreadExecutionContext => ec }
import pekko.dispatch.ExecutionContexts.{ parasitic => ec }

import language.implicitConversions
private implicit def completionStageCovariant[T, U >: T](in: CompletionStage[T]): CompletionStage[U] =
Expand Down Expand Up @@ -475,7 +475,7 @@
case https: HttpsConnectionContext =>
delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
case _ =>

Check warning on line 478 in http-core/src/main/scala/org/apache/pekko/http/javadsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (3.3, 8)

Unreachable case except for null (if this is intentional, consider writing case null => instead).

Check warning on line 478 in http-core/src/main/scala/org/apache/pekko/http/javadsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (3.3, 11)

Unreachable case except for null (if this is intentional, consider writing case null => instead).
delegate.newHostConnectionPool[T](to.host, to.port, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ServerBinding private[http] (delegate: pekko.http.scaladsl.Http.ServerBind

def terminate(hardDeadline: java.time.Duration): CompletionStage[HttpTerminated] = {
delegate.terminate(FiniteDuration.apply(hardDeadline.toMillis, TimeUnit.MILLISECONDS))
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.sameThreadExecutionContext)
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic)
.asJava
}

Expand All @@ -103,7 +103,7 @@ class ServerBinding private[http] (delegate: pekko.http.scaladsl.Http.ServerBind
*/
def whenTerminationSignalIssued: CompletionStage[java.time.Duration] =
delegate.whenTerminationSignalIssued
.map(deadline => deadline.time.asJava)(ExecutionContexts.sameThreadExecutionContext)
.map(deadline => deadline.time.asJava)(ExecutionContexts.parasitic)
.asJava

/**
Expand All @@ -120,7 +120,7 @@ class ServerBinding private[http] (delegate: pekko.http.scaladsl.Http.ServerBind
*/
def whenTerminated: CompletionStage[HttpTerminated] =
delegate.whenTerminated
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.sameThreadExecutionContext)
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic)
.asJava

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,6 @@ object ServerBuilder {
def connectionSource(): Source[IncomingConnection, CompletionStage[ServerBinding]] =
http.bindImpl(interface, port, context.asScala, settings.asScala, log)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContexts.sameThreadExecutionContext).asJava).asJava
.mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContexts.parasitic).asJava).asJava
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
.watchTermination() { (termWatchBefore, termWatchAfter) =>
// flag termination when the user handler has gotten (or has emitted) termination
// signals in both directions
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContexts.sameThreadExecutionContext)
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContexts.parasitic)
}
.joinMat(baseFlow)(Keep.both))

Expand Down Expand Up @@ -189,7 +189,7 @@
@deprecated(
"Use Http().newServerAt(...)...connectionSource() to create a source that can be materialized to a binding.",
since = "Akka HTTP 10.2.0")
@nowarn("msg=deprecated")

Check warning on line 192 in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala

View workflow job for this annotation

GitHub Actions / validate-links

@nowarn annotation does not suppress any warnings

Check warning on line 192 in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 11)

@nowarn annotation does not suppress any warnings

Check warning on line 192 in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 8)

@nowarn annotation does not suppress any warnings
def bind(interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
Expand Down Expand Up @@ -238,7 +238,7 @@
* use the `pekko.http.server` config section or pass in a [[pekko.http.scaladsl.settings.ServerSettings]] explicitly.
*/
@deprecated("Use Http().newServerAt(...)...bindFlow() to create server bindings.", since = "Akka HTTP 10.2.0")
@nowarn("msg=deprecated")

Check warning on line 241 in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 11)

@nowarn annotation does not suppress any warnings

Check warning on line 241 in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 8)

@nowarn annotation does not suppress any warnings
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = DefaultPortForProtocol,
Expand Down Expand Up @@ -280,7 +280,7 @@
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) => Done
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
Expand Down Expand Up @@ -833,7 +833,7 @@
val parallelism = settings.pipeliningLimit * settings.maxConnections
Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) {
case (request, userContext) => poolInterface(request).transform(response => Success(response -> userContext))(
ExecutionContexts.sameThreadExecutionContext)
ExecutionContexts.parasitic)
}
}

Expand Down Expand Up @@ -948,7 +948,7 @@
* Note: rather than unbinding explicitly you can also use [[addToCoordinatedShutdown]] to add this task to Akka's coordinated shutdown.
*/
def unbind(): Future[Done] =
unbindAction().map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
unbindAction().map(_ => Done)(ExecutionContexts.parasitic)

/**
* Triggers "graceful" termination request being handled on this connection.
Expand Down Expand Up @@ -998,7 +998,7 @@

_whenTerminationSignalIssued.trySuccess(hardDeadline.fromNow)
val terminated =
unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContexts.sameThreadExecutionContext)
unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContexts.parasitic)
_whenTerminated.completeWith(terminated)
whenTerminated
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@
unbind()
}
shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, s"http-terminate-${localAddress}") { () =>
terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContexts.parasitic)
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class WebSocketIntegrationSpec extends PekkoSpecWithMaterializer(

override def preStart(): Unit = {
promise.future.foreach(_ => getAsyncCallback[Done](_ => complete(shape.out)).invoke(Done))(
pekko.dispatch.ExecutionContexts.sameThreadExecutionContext)
pekko.dispatch.ExecutionContexts.parasitic)
}

setHandlers(shape.in, shape.out, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ class RequestContext private (val delegate: scaladsl.server.RequestContext) {

def complete[T](value: T, marshaller: Marshaller[T, HttpResponse]): CompletionStage[RouteResult] = {
delegate.complete(ToResponseMarshallable(value)(marshaller))
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava
}

def completeWith(response: HttpResponse): CompletionStage[RouteResult] = {
delegate.complete(response.asScala)
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava
}

@varargs def reject(rejections: Rejection*): CompletionStage[RouteResult] = {
val scalaRejections = rejections.map(_.asScala)
delegate.reject(scalaRejections: _*)
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava
}

def redirect(uri: Uri, redirectionType: StatusCode): CompletionStage[RouteResult] = {
Expand All @@ -80,7 +80,7 @@ class RequestContext private (val delegate: scaladsl.server.RequestContext) {

def fail(error: Throwable): CompletionStage[RouteResult] =
delegate.fail(error)
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava

def withRequest(req: HttpRequest): RequestContext = wrap(delegate.withRequest(req.asScala))
def withExecutionContext(ec: ExecutionContextExecutor): RequestContext = wrap(delegate.withExecutionContext(ec))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,24 @@ abstract class BasicDirectives {
inner: Supplier[Route]): Route = RouteAdapter {
D.mapRouteResultFuture(stage =>
FutureConverters.asScala(
f(stage.fast.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).asJava)).fast.map(_.asScala)(
ExecutionContexts.sameThreadExecutionContext)) {
f(stage.fast.map(_.asJava)(ExecutionContexts.parasitic).asJava)).fast.map(_.asScala)(
ExecutionContexts.parasitic)) {
inner.get.delegate
}
}

def mapRouteResultWith(f: JFunction[RouteResult, CompletionStage[RouteResult]], inner: Supplier[Route]): Route =
RouteAdapter {
D.mapRouteResultWith(r =>
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) {
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.parasitic)) {
inner.get.delegate
}
}

def mapRouteResultWithPF(
f: PartialFunction[RouteResult, CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter {
D.mapRouteResultWith(r =>
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) {
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.parasitic)) {
inner.get.delegate
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ abstract class BasicDirectives {
f: JFunction[JIterable[Rejection], CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter {
D.recoverRejectionsWith(rs =>
FutureConverters.asScala(f.apply(Util.javaArrayList(rs.map(_.asJava)))).fast.map(_.asScala)(
ExecutionContexts.sameThreadExecutionContext)) { inner.get.delegate }
ExecutionContexts.parasitic)) { inner.get.delegate }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class RouteDirectives extends RespondWithDirectives {
import RoutingJavaMapping.Implicits._

// Don't try this at home – we only use it here for the java -> scala conversions
private implicit val conversionExecutionContext: ExecutionContext = ExecutionContexts.sameThreadExecutionContext
private implicit val conversionExecutionContext: ExecutionContext = ExecutionContexts.parasitic

/**
* Java-specific call added so you can chain together multiple alternate routes using comma,
Expand Down
Loading