package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.test.util.AssertEventually;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/confluent/ksql/api/client/impl/StreamedQueryResultImplTest.class */
public class StreamedQueryResultImplTest {

    @Mock
    private Subscriber<Row> subscriber;

    @Mock
    private Row row;
    private Vertx vertx;
    private Context context;
    private Subscription subscription;
    private StreamedQueryResultImpl queryResult;
    private volatile boolean subscriberReceivedRow;
    private volatile boolean subscriberCompleted;
    private volatile boolean subscriberFailed;

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.context = this.vertx.getOrCreateContext();
        ((Subscriber) Mockito.doAnswer(invocationOnMock -> {
            this.subscription = (Subscription) invocationOnMock.getArguments()[0];
            return null;
        }).when(this.subscriber)).onSubscribe((Subscription) ArgumentMatchers.any());
        this.subscriberReceivedRow = false;
        ((Subscriber) Mockito.doAnswer(invocationOnMock2 -> {
            this.subscriberReceivedRow = true;
            return null;
        }).when(this.subscriber)).onNext(this.row);
        this.subscriberCompleted = false;
        ((Subscriber) Mockito.doAnswer(invocationOnMock3 -> {
            this.subscriberCompleted = true;
            return null;
        }).when(this.subscriber)).onComplete();
        this.subscriberFailed = false;
        ((Subscriber) Mockito.doAnswer(invocationOnMock4 -> {
            this.subscriberFailed = true;
            return null;
        }).when(this.subscriber)).onError((Throwable) ArgumentMatchers.any());
        this.queryResult = new StreamedQueryResultImpl(this.context, "queryId", Collections.emptyList(), Collections.emptyList(), new AtomicReference(""), (String) null, (Map) null, (ClientImpl) null);
    }

    @Test
    public void shouldNotSubscribeIfPolling() {
        this.queryResult.poll(Duration.ofNanos(1L));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            this.queryResult.subscribe(this.subscriber);
        })).getMessage(), Matchers.containsString("Cannot set subscriber if polling"));
    }

    @Test
    public void shouldNotPollIfSubscribed() throws Exception {
        subscribe();
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            this.queryResult.poll();
        })).getMessage(), Matchers.containsString("Cannot poll if subscriber has been set"));
    }

    @Test
    public void shouldNotPollIfFailed() throws Exception {
        handleQueryResultError();
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            this.queryResult.poll();
        })).getMessage(), Matchers.containsString("Cannot poll on StreamedQueryResult that has failed"));
    }

    @Test
    public void shouldReturnFromPollOnError() throws Exception {
        this.queryResult.poll(Duration.ofNanos(1L));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        new Thread(() -> {
            StreamedQueryResultImpl.pollWithCallback(this.queryResult, () -> {
                countDownLatch.countDown();
            });
            countDownLatch2.countDown();
        }).start();
        awaitLatch(countDownLatch);
        handleQueryResultError();
        awaitLatch(countDownLatch2);
    }

    @Test
    public void shouldPropagateErrorToSubscriber() throws Exception {
        subscribe();
        handleQueryResultError();
        ((Subscriber) Mockito.verify(this.subscriber)).onError((Throwable) ArgumentMatchers.any());
    }

    @Test
    public void shouldDeliverBufferedRowsIfComplete() throws Exception {
        givenPublisherAcceptsOneRow();
        completeQueryResult();
        MatcherAssert.assertThat(this.queryResult.poll(), Matchers.is(this.row));
    }

    @Test
    public void shouldThrowOnContinueIfNoContinuationToken() {
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlClientException.class, () -> {
        })).getMessage(), Matchers.containsString("Can only continue queries that have saved a continuation token."));
    }

    @Test
    public void shouldDeliverBufferedRowsOnError() throws Exception {
        givenPublisherAcceptsOneRow();
        subscribe();
        handleQueryResultError();
        this.subscription.request(1L);
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.subscriberReceivedRow);
        }, Matchers.is(true));
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.subscriberFailed);
        }, Matchers.is(true));
    }

    @Test
    public void shouldNotSubscribeIfFailed() throws Exception {
        handleQueryResultError();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.context.runOnContext(r5 -> {
            MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
                this.queryResult.subscribe(this.subscriber);
            })).getMessage(), Matchers.containsString("Cannot subscribe to failed publisher"));
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
    }

    @Test
    public void shouldAllowSubscribeIfComplete() throws Exception {
        givenPublisherAcceptsOneRow();
        completeQueryResult();
        subscribe();
        this.subscription.request(1L);
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.subscriberReceivedRow);
        }, Matchers.is(true));
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.subscriberCompleted);
        }, Matchers.is(true));
    }

    private void subscribe() throws Exception {
        execOnContextAndWait(() -> {
            this.queryResult.subscribe(this.subscriber);
        });
    }

    private void handleQueryResultError() throws Exception {
        execOnContextAndWait(() -> {
            this.queryResult.handleError(new RuntimeException("boom"));
        });
    }

    private void completeQueryResult() throws Exception {
        execOnContextAndWait(() -> {
            this.queryResult.complete();
        });
    }

    private void givenPublisherAcceptsOneRow() throws Exception {
        execOnContextAndWait(() -> {
            this.queryResult.accept(this.row);
        });
    }

    private void execOnContextAndWait(Runnable runnable) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.context.runOnContext(r4 -> {
            runnable.run();
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
    }

    private static void awaitLatch(CountDownLatch countDownLatch) throws Exception {
        MatcherAssert.assertThat(Boolean.valueOf(countDownLatch.await(2000L, TimeUnit.MILLISECONDS)), Matchers.is(true));
    }
}
