package io.servicetalk.http.netty;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.BlockingIterator;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.BlockingStreamingHttpClient;
import io.servicetalk.http.api.BlockingStreamingHttpResponse;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpSerializers;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.test.resources.TestUtils;
import io.servicetalk.transport.netty.internal.AddressUtils;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Nullable;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

/* loaded from: input_file:io/servicetalk/http/netty/AsyncContextHttpFilterVerifier.class */
public final class AsyncContextHttpFilterVerifier {
    public static final ContextMap.Key<String> K1 = ContextMap.Key.newKey("k1", String.class);
    public static final ContextMap.Key<String> K2 = ContextMap.Key.newKey("k2", String.class);
    public static final ContextMap.Key<String> K3 = ContextMap.Key.newKey("k3", String.class);
    public static final String V1 = "v1";
    public static final String V2 = "v2";
    public static final String V3 = "v3";

    /* loaded from: input_file:io/servicetalk/http/netty/AsyncContextHttpFilterVerifier$AsyncContextAssertionFilter.class */
    public static final class AsyncContextAssertionFilter implements StreamingHttpServiceFilterFactory {
        final Queue<Throwable> errorQueue;
        private final boolean lazyPayload;
        private final boolean hasK2;
        private final boolean hasK3;

        public AsyncContextAssertionFilter(Queue<Throwable> queue) {
            this(queue, true, true, true);
        }

        public AsyncContextAssertionFilter(Queue<Throwable> queue, boolean z, boolean z2, boolean z3) {
            this.errorQueue = queue;
            this.lazyPayload = z;
            this.hasK2 = z2;
            this.hasK3 = z3;
        }

        public StreamingHttpServiceFilter create(StreamingHttpService streamingHttpService) {
            return new StreamingHttpServiceFilter(streamingHttpService) { // from class: io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.AsyncContextAssertionFilter.1
                public Single<StreamingHttpResponse> handle(HttpServiceContext httpServiceContext, StreamingHttpRequest streamingHttpRequest, StreamingHttpResponseFactory streamingHttpResponseFactory) {
                    AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, null, AsyncContextAssertionFilter.this.errorQueue);
                    AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, null, AsyncContextAssertionFilter.this.errorQueue);
                    AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, null, AsyncContextAssertionFilter.this.errorQueue);
                    return super.handle(httpServiceContext, streamingHttpRequest.transformMessageBody(publisher -> {
                        return publisher.beforeFinally(() -> {
                            AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextAssertionFilter.this.lazyPayload ? AsyncContextHttpFilterVerifier.V1 : null, AsyncContextAssertionFilter.this.errorQueue);
                            AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, null, AsyncContextAssertionFilter.this.errorQueue);
                            AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, null, AsyncContextAssertionFilter.this.errorQueue);
                        });
                    }), streamingHttpResponseFactory).beforeOnSuccess(streamingHttpResponse -> {
                        AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextHttpFilterVerifier.V1, AsyncContextAssertionFilter.this.errorQueue);
                        AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, AsyncContextAssertionFilter.this.hasK2 ? AsyncContextHttpFilterVerifier.V2 : null, AsyncContextAssertionFilter.this.errorQueue);
                        AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, null, AsyncContextAssertionFilter.this.errorQueue);
                    }).liftSync(new BeforeFinallyHttpOperator(() -> {
                        AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextHttpFilterVerifier.V1, AsyncContextAssertionFilter.this.errorQueue);
                        AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, AsyncContextAssertionFilter.this.hasK2 ? AsyncContextHttpFilterVerifier.V2 : null, AsyncContextAssertionFilter.this.errorQueue);
                        AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, AsyncContextAssertionFilter.this.hasK3 ? AsyncContextHttpFilterVerifier.V3 : null, AsyncContextAssertionFilter.this.errorQueue);
                    })).shareContextOnSubscribe();
                }
            };
        }
    }

    private AsyncContextHttpFilterVerifier() {
    }

    public static void verifyServerFilterAsyncContextVisibility(StreamingHttpServiceFilterFactory streamingHttpServiceFilterFactory) throws Exception {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        List singletonList = Collections.singletonList("Hello World");
        BlockingStreamingHttpClient buildBlockingStreaming = HttpClients.forSingleAddress(AddressUtils.serverHostAndPort(HttpServers.forAddress(AddressUtils.localAddress(0)).appendServiceFilter(new AsyncContextAssertionFilter(linkedBlockingDeque)).appendServiceFilter(streamingHttpServiceFilterFactory).listenStreamingAndAwait(asyncContextRequestHandler(linkedBlockingDeque)))).buildBlockingStreaming();
        BlockingStreamingHttpResponse request = buildBlockingStreaming.request(buildBlockingStreaming.post("/test").payloadBody(singletonList, HttpSerializers.appSerializerUtf8FixLen()));
        MatcherAssert.assertThat(request.status(), Matchers.is(HttpResponseStatus.OK));
        BlockingIterator it = request.payloadBody(HttpSerializers.appSerializerUtf8FixLen()).iterator();
        MatcherAssert.assertThat(Boolean.valueOf(it.hasNext()), Matchers.is(true));
        MatcherAssert.assertThat(it.next(), Matchers.is(singletonList.get(0)));
        MatcherAssert.assertThat(Boolean.valueOf(it.hasNext()), Matchers.is(false));
        TestUtils.assertNoAsyncErrors(linkedBlockingDeque);
    }

    private static StreamingHttpService asyncContextRequestHandler(BlockingQueue<Throwable> blockingQueue) {
        return (httpServiceContext, streamingHttpRequest, streamingHttpResponseFactory) -> {
            AsyncContext.put(K1, V1);
            return streamingHttpRequest.payloadBody(HttpSerializers.appSerializerUtf8FixLen()).collect(StringBuilder::new, (sb, str) -> {
                sb.append(str);
                return sb;
            }).map((v0) -> {
                return v0.toString();
            }).map(str2 -> {
                AsyncContext.put(K2, V2);
                assertAsyncContext(K1, V1, blockingQueue);
                assertAsyncContext(K2, V2, blockingQueue);
                return streamingHttpResponseFactory.ok().payloadBody(Publisher.from(str2).map(str2 -> {
                    AsyncContext.put(K3, V3);
                    assertAsyncContext(K1, V1, blockingQueue);
                    assertAsyncContext(K2, V2, blockingQueue);
                    assertAsyncContext(K3, V3, blockingQueue);
                    return str2;
                }), HttpSerializers.appSerializerUtf8FixLen()).transformPayloadBody(publisher -> {
                    return publisher.beforeSubscriber(() -> {
                        return new PublisherSource.Subscriber<Buffer>() { // from class: io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.1
                            public void onSubscribe(PublisherSource.Subscription subscription) {
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextHttpFilterVerifier.V1, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, AsyncContextHttpFilterVerifier.V2, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, null, blockingQueue);
                            }

                            public void onNext(Buffer buffer) {
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextHttpFilterVerifier.V1, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, AsyncContextHttpFilterVerifier.V2, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, AsyncContextHttpFilterVerifier.V3, blockingQueue);
                            }

                            public void onError(Throwable th) {
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextHttpFilterVerifier.V1, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, AsyncContextHttpFilterVerifier.V2, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, AsyncContextHttpFilterVerifier.V3, blockingQueue);
                            }

                            public void onComplete() {
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K1, AsyncContextHttpFilterVerifier.V1, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K2, AsyncContextHttpFilterVerifier.V2, blockingQueue);
                                AsyncContextHttpFilterVerifier.assertAsyncContext(AsyncContextHttpFilterVerifier.K3, AsyncContextHttpFilterVerifier.V3, blockingQueue);
                            }
                        };
                    });
                });
            });
        };
    }

    public static <T> void assertAsyncContext(ContextMap.Key<T> key, @Nullable T t, Queue<Throwable> queue) {
        Object obj = AsyncContext.get(key);
        if ((t != null || obj == null) && (t == null || t.equals(obj))) {
            return;
        }
        queue.add(new AssertionError("unexpected value for " + key + ": " + obj + ", expected: " + t));
    }
}
