Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the firstAndTail operator to the public API #3186

Merged
merged 13 commits into from
Feb 14, 2025

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.BufferStrategy.Accumulator;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.context.api.ContextMap;

import org.slf4j.Logger;
Expand Down Expand Up @@ -2664,6 +2665,41 @@ public final Publisher<T> skipWhile(Predicate<? super T> predicate) {
return filter(FilterPublisher.skipWhileSupplier(predicate));
}

/**
* Converts this {@link Publisher} to a {@link Single} that will contain the first element of this {@link Publisher}
* and a {@link Publisher} representing the remainder of the stream, to a mapping function.
* <p>
* Note that either the packer function itself or any operator following this one MUST eventually take care of the
* tail {@link Publisher} or they risk leaking resources or attached callbacks on the original Publisher. This
* includes cases where an exception is thrown instead of returning the packed object or a failed Single is
* returned. In addition, the tail may only be subscribed to exactly one time. Subsequent {@link Subscriber}s will
* receive {@link DuplicateSubscribeException}, even if the original publisher supports re-subscribes.
* <pre>{@code
* class Result {
* Result(T head, Iterator<T> tail) {
* this.head = head;
* this.tail = tail;
* }
*
* T head;
* Iterator<T> tail;
* }
*
* Iterator<T> itr = resultOfThisPublisher();
* Result result = new Result(itr.next(), itr);
* return result;
* }</pre>
*
* @param packer A function that takes the head of the input stream and processes it, along with a {@link Publisher}
* of the remainder of the stream.
* @param <R> The resulting type of the packer operation.
* @return A {@link Single} containing the packed object that is a result of calling packer on the
* head and tail of the stream.
*/
public final <R> Single<R> firstAndTail(BiFunction<T, Publisher<T>, R> packer) {
return this.liftSyncToSingle(new FirstAndTailToPackedSingle<>(packer));
}

/**
* Takes at most {@code numElements} elements from {@code this} {@link Publisher}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright © 2025 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.internal.ThrowableUtils;

import java.util.concurrent.CancellationException;

final class StacklessCancellationException extends CancellationException {
private static final long serialVersionUID = 253828407685585783L;

private StacklessCancellationException(String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}

static StacklessCancellationException newInstance(String message, Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessCancellationException(message), clazz, method);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.reactivestreams.tck.AbstractPublisherOperatorTckTest;

import org.testng.annotations.Test;

import static io.servicetalk.concurrent.api.Publisher.from;

@Test
public class SpliceFlatStreamToMetaSingleTckTest extends AbstractPublisherOperatorTckTest<Integer> {
public class FirstAndTailToPackedSingleTckTest extends AbstractPublisherOperatorTckTest<Integer> {

@Override
protected Publisher<Integer> composePublisher(final Publisher<Integer> publisher, final int elements) {
Publisher<Integer> composed = publisher.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Result::new))
Publisher<Integer> composed = publisher.firstAndTail(Result::new)
.flatMapPublisher(result -> from(result.first).concat(result.following));
// For TCK only, convert an error the splice generates for an empty stream to onComplete():
return elements == 0 ? composed.onErrorComplete(IllegalStateException.class) : composed;
Expand Down
1 change: 0 additions & 1 deletion servicetalk-http-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ dependencies {
testImplementation testFixtures(project(":servicetalk-buffer-api"))
testImplementation testFixtures(project(":servicetalk-concurrent-api"))
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation testFixtures(project(":servicetalk-concurrent-reactivestreams"))
testImplementation testFixtures(project(":servicetalk-http-api"))
testImplementation testFixtures(project(":servicetalk-log4j2-mdc-utils"))
testImplementation testFixtures(project(":servicetalk-transport-netty-internal"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void cancel() {
}
}
})
.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(this::newSplicedResponse));
.firstAndTail(this::newSplicedResponse);
}

@Override
Expand Down Expand Up @@ -246,9 +246,10 @@ public final HttpExecutionContext executionContext() {
protected abstract Publisher<Object> writeAndRead(Publisher<Object> stream,
@Nullable FlushStrategy flushStrategy);

private StreamingHttpResponse newSplicedResponse(HttpResponseMetaData meta, Publisher<Object> pub) {
private StreamingHttpResponse newSplicedResponse(Object head, Publisher<Object> tail) {
HttpResponseMetaData meta = (HttpResponseMetaData) head;
return newTransportResponse(meta.status(), meta.version(), meta.headers(),
connectionContext.executionContext().bufferAllocator(), pub,
connectionContext.executionContext().bufferAllocator(), tail,
allowDropTrailersReadFromTransport, headersFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,12 @@ static final class NettyHttpServerConnection extends HttpServiceContext implemen

void process(final boolean handleMultipleRequests) {
final Single<StreamingHttpRequest> requestSingle =
connection.read().liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(
(HttpRequestMetaData meta, Publisher<Object> payload) ->
newTransportRequest(meta.method(), meta.requestTarget(), meta.version(),
meta.headers(), executionContext().bufferAllocator(), payload,
requireTrailerHeader, headersFactory)));
connection.read().firstAndTail((head, payload) -> {
HttpRequestMetaData meta = (HttpRequestMetaData) head;
return newTransportRequest(meta.method(), meta.requestTarget(), meta.version(),
meta.headers(), executionContext().bufferAllocator(), payload,
requireTrailerHeader, headersFactory);
});
toSource(handleRequestAndWriteResponse(requestSingle, handleMultipleRequests))
.subscribe(new ErrorLoggingHttpSubscriber(this));
}
Expand Down
Loading