package org.asynchttpclient.reactivestreams;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hive.druid.io.netty.channel.Channel;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
import org.asynchttpclient.reactivestreams.ReactiveStreamsTest;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/asynchttpclient/reactivestreams/FailingReactiveStreamsTest.class */
public class FailingReactiveStreamsTest extends AbstractBasicTest {

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/FailingReactiveStreamsTest$BlockedStreamSubscriber.class */
    private static class BlockedStreamSubscriber extends ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> {
        private static final Logger LOGGER = LoggerFactory.getLogger(BlockedStreamSubscriber.class);
        private final CountDownLatch streamStarted;
        private final CountDownLatch streamOnHold;

        BlockedStreamSubscriber(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.streamStarted = countDownLatch;
            this.streamOnHold = countDownLatch2;
        }

        @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsTest.SimpleSubscriber
        public void onNext(HttpResponseBodyPart httpResponseBodyPart) {
            this.streamStarted.countDown();
            try {
                this.streamOnHold.await();
            } catch (InterruptedException e) {
                LOGGER.error("`streamOnHold` latch was interrupted", e);
            }
            super.onNext((BlockedStreamSubscriber) httpResponseBodyPart);
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/FailingReactiveStreamsTest$ReplayedSimpleAsyncHandler.class */
    private static class ReplayedSimpleAsyncHandler extends ReactiveStreamsTest.SimpleStreamedAsyncHandler {
        private final CountDownLatch replaying;

        ReplayedSimpleAsyncHandler(CountDownLatch countDownLatch, ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> simpleSubscriber) {
            super(simpleSubscriber);
            this.replaying = countDownLatch;
        }

        public void onRetry() {
            this.replaying.countDown();
        }
    }

    @Test
    public void testRetryingOnFailingStream() throws Exception {
        AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient();
        Throwable th = null;
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            final AtomicReference atomicReference = new AtomicReference(null);
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new ReplayedSimpleAsyncHandler(countDownLatch3, new BlockedStreamSubscriber(countDownLatch, countDownLatch2)) { // from class: org.asynchttpclient.reactivestreams.FailingReactiveStreamsTest.1
                @Override // org.asynchttpclient.reactivestreams.ReactiveStreamsTest.SimpleStreamedAsyncHandler
                public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
                    if (publisher instanceof StreamedResponsePublisher) {
                        return !atomicReference.compareAndSet(null, (StreamedResponsePublisher) publisher) ? AsyncHandler.State.ABORT : super.onStream(publisher);
                    }
                    throw new IllegalStateException(String.format("publisher %s is expected to be an instance of %s", publisher, StreamedResponsePublisher.class));
                }
            });
            countDownLatch.await();
            Assert.assertTrue(atomicReference.get() != null, "Expected a not null publisher.");
            StreamedResponsePublisher streamedResponsePublisher = (StreamedResponsePublisher) atomicReference.get();
            CountDownLatch countDownLatch4 = new CountDownLatch(1);
            getChannel(streamedResponsePublisher).close().addListener(future -> {
                countDownLatch4.countDown();
            });
            countDownLatch2.countDown();
            countDownLatch4.await();
            countDownLatch3.await();
            Assert.assertTrue(true);
            if (asyncHttpClient != null) {
                if (0 == 0) {
                    asyncHttpClient.close();
                    return;
                }
                try {
                    asyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    private Channel getChannel(StreamedResponsePublisher streamedResponsePublisher) throws Exception {
        Field declaredField = streamedResponsePublisher.getClass().getDeclaredField("channel");
        declaredField.setAccessible(true);
        return (Channel) declaredField.get(streamedResponsePublisher);
    }
}
