Skip to content

Commit

Permalink
Replace ExecutionContexts.sameThreadExecutionContext with parasitic
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Dec 31, 2023
1 parent 7336f7c commit d8d347b
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging {
// has completed.
val completed = pool.shutdown()(context.dispatcher)
shutdownCompletedPromise.tryCompleteWith(
completed.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext))
completed.map(_ => Done)(ExecutionContexts.parasitic))
statusById += poolId -> PoolInterfaceShuttingDown(shutdownCompletedPromise)
case Some(PoolInterfaceShuttingDown(formerPromise)) =>
// Pool is already shutting down, mirror the existing promise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ private[client] object NewHostConnectionPool {
entityComplete.onComplete(safely {
case Success(_) => withSlot(_.onRequestEntityCompleted())
case Failure(cause) => withSlot(_.onRequestEntityFailed(cause))
})(ExecutionContexts.sameThreadExecutionContext)
})(ExecutionContexts.parasitic)
request.withEntity(newEntity)
}

Expand Down Expand Up @@ -524,9 +524,9 @@ private[client] object NewHostConnectionPool {
ongoingResponseEntity = None
ongoingResponseEntityKillSwitch = None
}
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
case Failure(_) => throw new IllegalStateException("Should never fail")
})(ExecutionContexts.sameThreadExecutionContext)
})(ExecutionContexts.parasitic)

withSlot(_.onResponseReceived(response.withEntity(newEntity)))
}
Expand Down Expand Up @@ -609,7 +609,7 @@ private[client] object NewHostConnectionPool {
onConnectionAttemptFailed(currentEmbargoLevel)
sl.onConnectionAttemptFailed(cause)
}
})(ExecutionContexts.sameThreadExecutionContext)
})(ExecutionContexts.parasitic)

slotCon
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) =>
Done
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private[http] object HttpServerBluePrint {
log.error(ex,
s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
case _ => // ignore
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
newEntity
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[http] object StreamUtils {
killResult.future.value match {
case Some(res) => handleKill(res)
case None =>
killResult.future.onComplete(killCallback.invoke)(ExecutionContexts.sameThreadExecutionContext)
killResult.future.onComplete(killCallback.invoke)(ExecutionContexts.parasitic)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object Http extends ExtensionId[Http] with ExtensionIdProvider {
}

class Http(system: ExtendedActorSystem) extends pekko.actor.Extension {
import pekko.dispatch.ExecutionContexts.{ sameThreadExecutionContext => ec }
import pekko.dispatch.ExecutionContexts.{ parasitic => ec }

import language.implicitConversions
private implicit def completionStageCovariant[T, U >: T](in: CompletionStage[T]): CompletionStage[U] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ServerBinding private[http] (delegate: pekko.http.scaladsl.Http.ServerBind

def terminate(hardDeadline: java.time.Duration): CompletionStage[HttpTerminated] = {
delegate.terminate(FiniteDuration.apply(hardDeadline.toMillis, TimeUnit.MILLISECONDS))
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.sameThreadExecutionContext)
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic)
.asJava
}

Expand All @@ -103,7 +103,7 @@ class ServerBinding private[http] (delegate: pekko.http.scaladsl.Http.ServerBind
*/
def whenTerminationSignalIssued: CompletionStage[java.time.Duration] =
delegate.whenTerminationSignalIssued
.map(deadline => deadline.time.asJava)(ExecutionContexts.sameThreadExecutionContext)
.map(deadline => deadline.time.asJava)(ExecutionContexts.parasitic)
.asJava

/**
Expand All @@ -120,7 +120,7 @@ class ServerBinding private[http] (delegate: pekko.http.scaladsl.Http.ServerBind
*/
def whenTerminated: CompletionStage[HttpTerminated] =
delegate.whenTerminated
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.sameThreadExecutionContext)
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic)
.asJava

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,6 @@ object ServerBuilder {
def connectionSource(): Source[IncomingConnection, CompletionStage[ServerBinding]] =
http.bindImpl(interface, port, context.asScala, settings.asScala, log)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContexts.sameThreadExecutionContext).asJava).asJava
.mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContexts.parasitic).asJava).asJava
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
.watchTermination() { (termWatchBefore, termWatchAfter) =>
// flag termination when the user handler has gotten (or has emitted) termination
// signals in both directions
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContexts.sameThreadExecutionContext)
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContexts.parasitic)
}
.joinMat(baseFlow)(Keep.both))

Expand Down Expand Up @@ -280,7 +280,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) => Done
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
Expand Down Expand Up @@ -833,7 +833,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
val parallelism = settings.pipeliningLimit * settings.maxConnections
Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) {
case (request, userContext) => poolInterface(request).transform(response => Success(response -> userContext))(
ExecutionContexts.sameThreadExecutionContext)
ExecutionContexts.parasitic)
}
}

Expand Down Expand Up @@ -948,7 +948,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* Note: rather than unbinding explicitly you can also use [[addToCoordinatedShutdown]] to add this task to Akka's coordinated shutdown.
*/
def unbind(): Future[Done] =
unbindAction().map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
unbindAction().map(_ => Done)(ExecutionContexts.parasitic)

/**
* Triggers "graceful" termination request being handled on this connection.
Expand Down Expand Up @@ -998,7 +998,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {

_whenTerminationSignalIssued.trySuccess(hardDeadline.fromNow)
val terminated =
unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContexts.sameThreadExecutionContext)
unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContexts.parasitic)
_whenTerminated.completeWith(terminated)
whenTerminated
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
unbind()
}
shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, s"http-terminate-${localAddress}") { () =>
terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContexts.parasitic)
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class WebSocketIntegrationSpec extends PekkoSpecWithMaterializer(

override def preStart(): Unit = {
promise.future.foreach(_ => getAsyncCallback[Done](_ => complete(shape.out)).invoke(Done))(
pekko.dispatch.ExecutionContexts.sameThreadExecutionContext)
pekko.dispatch.ExecutionContexts.parasitic)
}

setHandlers(shape.in, shape.out, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ class RequestContext private (val delegate: scaladsl.server.RequestContext) {

def complete[T](value: T, marshaller: Marshaller[T, HttpResponse]): CompletionStage[RouteResult] = {
delegate.complete(ToResponseMarshallable(value)(marshaller))
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava
}

def completeWith(response: HttpResponse): CompletionStage[RouteResult] = {
delegate.complete(response.asScala)
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava
}

@varargs def reject(rejections: Rejection*): CompletionStage[RouteResult] = {
val scalaRejections = rejections.map(_.asScala)
delegate.reject(scalaRejections: _*)
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava
}

def redirect(uri: Uri, redirectionType: StatusCode): CompletionStage[RouteResult] = {
Expand All @@ -80,7 +80,7 @@ class RequestContext private (val delegate: scaladsl.server.RequestContext) {

def fail(error: Throwable): CompletionStage[RouteResult] =
delegate.fail(error)
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.sameThreadExecutionContext).asJava
.fast.map(r => r: RouteResult)(pekko.dispatch.ExecutionContexts.parasitic).asJava

def withRequest(req: HttpRequest): RequestContext = wrap(delegate.withRequest(req.asScala))
def withExecutionContext(ec: ExecutionContextExecutor): RequestContext = wrap(delegate.withExecutionContext(ec))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,24 @@ abstract class BasicDirectives {
inner: Supplier[Route]): Route = RouteAdapter {
D.mapRouteResultFuture(stage =>
FutureConverters.asScala(
f(stage.fast.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).asJava)).fast.map(_.asScala)(
ExecutionContexts.sameThreadExecutionContext)) {
f(stage.fast.map(_.asJava)(ExecutionContexts.parasitic).asJava)).fast.map(_.asScala)(
ExecutionContexts.parasitic)) {
inner.get.delegate
}
}

def mapRouteResultWith(f: JFunction[RouteResult, CompletionStage[RouteResult]], inner: Supplier[Route]): Route =
RouteAdapter {
D.mapRouteResultWith(r =>
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) {
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.parasitic)) {
inner.get.delegate
}
}

def mapRouteResultWithPF(
f: PartialFunction[RouteResult, CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter {
D.mapRouteResultWith(r =>
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) {
FutureConverters.asScala(f(r.asJava)).fast.map(_.asScala)(ExecutionContexts.parasitic)) {
inner.get.delegate
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ abstract class BasicDirectives {
f: JFunction[JIterable[Rejection], CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter {
D.recoverRejectionsWith(rs =>
FutureConverters.asScala(f.apply(Util.javaArrayList(rs.map(_.asJava)))).fast.map(_.asScala)(
ExecutionContexts.sameThreadExecutionContext)) { inner.get.delegate }
ExecutionContexts.parasitic)) { inner.get.delegate }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class RouteDirectives extends RespondWithDirectives {
import RoutingJavaMapping.Implicits._

// Don't try this at home – we only use it here for the java -> scala conversions
private implicit val conversionExecutionContext: ExecutionContext = ExecutionContexts.sameThreadExecutionContext
private implicit val conversionExecutionContext: ExecutionContext = ExecutionContexts.parasitic

/**
* Java-specific call added so you can chain together multiple alternate routes using comma,
Expand Down

0 comments on commit d8d347b

Please sign in to comment.