Skip to content

Commit

Permalink
Update exceptions from AbstractNoHandleSubscribe sources (#3193)
Browse files Browse the repository at this point in the history
Motivation:

`AbstractNoHandleSubscribe[Completable|Single|Publisher]` were used in
the past not only to enforce non-standard context propagation but also
an `Executor`. Then in 2020 we removed an `Executor` reference from
operators chain, but did not reword these exceptions.

Modifications:

- Align exception type to always fail subscriber with
`UnsupportedOperationException`.
- Align exception message to clarify it's intended to target
`CapturedContext` instead of `Executor`.
- Simplify `LiftSynchronousPublisherToSingle` by extending
`AbstractNoHandleSubscribeSingle`.

Result:

Updated and consistent behavior of `AbstractNoHandleSubscribe` sources.
  • Loading branch information
idelpivnitskiy authored Feb 19, 2025
1 parent 1af323e commit 2a09aa2
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.context.api.ContextMap;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;

/**
* A {@link Completable} that does not expect to receive a call to {@link #handleSubscribe(Subscriber)} since it
* overrides {@link Completable#handleSubscribe(Subscriber, ContextMap, AsyncContextProvider)}.
* overrides {@link Completable#handleSubscribe(Subscriber, CapturedContext, AsyncContextProvider)}.
*/
abstract class AbstractNoHandleSubscribeCompletable extends Completable implements CompletableSource {

@Override
protected final void handleSubscribe(Subscriber subscriber) {
deliverErrorFromSource(subscriber,
new UnsupportedOperationException("Subscribe with no executor is not supported for " + getClass()));
protected final void handleSubscribe(final Subscriber subscriber) {
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.RejectedSubscribeException;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;

Expand All @@ -29,9 +28,9 @@
abstract class AbstractNoHandleSubscribePublisher<T> extends Publisher<T> implements PublisherSource<T> {

@Override
protected final void handleSubscribe(Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber,
new RejectedSubscribeException("Subscribe with no executor is not supported for " + getClass()));
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.context.api.ContextMap;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;

/**
* A {@link Single} that does not expect to receive a call to {@link #handleSubscribe(Subscriber)} since it overrides
* {@link Single#handleSubscribe(Subscriber, ContextMap, AsyncContextProvider)}.
* {@link Single#handleSubscribe(Subscriber, CapturedContext, AsyncContextProvider)}.
*
* @param <T> Type of the result of the single.
*/
abstract class AbstractNoHandleSubscribeSingle<T> extends Single<T> implements SingleSource<T> {

@Override
protected final void handleSubscribe(Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber,
new UnsupportedOperationException("Subscribe with no executor is not supported for " + getClass()));
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.SingleSource;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static java.util.Objects.requireNonNull;

final class LiftSynchronousPublisherToSingle<T, R> extends Single<R> implements SingleSource<R> {
final class LiftSynchronousPublisherToSingle<T, R> extends AbstractNoHandleSubscribeSingle<R> {
private final Publisher<T> original;
private final PublisherToSingleOperator<? super T, ? extends R> customOperator;

Expand All @@ -30,17 +27,6 @@ final class LiftSynchronousPublisherToSingle<T, R> extends Single<R> implements
this.customOperator = requireNonNull(customOperator);
}

@Override
protected void handleSubscribe(final SingleSource.Subscriber<? super R> subscriber) {
deliverErrorFromSource(subscriber,
new UnsupportedOperationException("Subscribe with no executor is not supported for " + getClass()));
}

@Override
public void subscribe(final SingleSource.Subscriber<? super R> subscriber) {
subscribeInternal(subscriber);
}

@Override
void handleSubscribe(final Subscriber<? super R> subscriber,
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
Expand Down

0 comments on commit 2a09aa2

Please sign in to comment.