package io.confluent.ksql.api.client.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import java.time.Duration;
import java.util.ArrayList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/confluent/ksql/api/client/impl/PollableSubscriberTest.class */
public class PollableSubscriberTest {
    private static Duration POLL_DURATION = Duration.ofMillis(100);
    private static String COLUMN_NAME = "id";
    private Publisher<Row> publisher;
    private Vertx vertx;
    private Throwable throwable;
    private Context context;
    private PollableSubscriber pollableSubscriber;

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.context = this.vertx.getOrCreateContext();
        this.pollableSubscriber = new PollableSubscriber(this.context, th -> {
            this.throwable = th;
        });
    }

    @Test
    public void shouldPollSingleBatch() {
        shouldPollRows(90);
    }

    @Test
    public void shouldPollMultiBatch() {
        shouldPollRows(110);
    }

    @Test
    public void shouldSetError() {
        this.publisher = new BufferedPublisher<Row>(this.context, ImmutableList.of()) { // from class: io.confluent.ksql.api.client.impl.PollableSubscriberTest.1
            protected void maybeSend() {
                sendError(new RuntimeException("Error!"));
            }
        };
        this.publisher.subscribe(this.pollableSubscriber);
        MatcherAssert.assertThat(this.pollableSubscriber.poll(POLL_DURATION), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(this.throwable, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(this.throwable.getMessage(), Matchers.is("Error!"));
    }

    private void shouldPollRows(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(createRow(i2));
        }
        this.publisher = new BufferedPublisher(this.context, arrayList);
        this.publisher.subscribe(this.pollableSubscriber);
        Row poll = this.pollableSubscriber.poll(POLL_DURATION);
        int i3 = 0;
        while (poll != null) {
            MatcherAssert.assertThat(poll.getLong(COLUMN_NAME), Matchers.is(Long.valueOf(i3)));
            poll = this.pollableSubscriber.poll(POLL_DURATION);
            i3++;
        }
        MatcherAssert.assertThat(Integer.valueOf(i3), Matchers.is(Integer.valueOf(i)));
        MatcherAssert.assertThat(this.throwable, Matchers.is(Matchers.nullValue()));
    }

    private Row createRow(long j) {
        return new RowImpl(ImmutableList.of(COLUMN_NAME), ImmutableList.of(new ColumnTypeImpl("BIGINT")), new JsonArray().add(Long.valueOf(j)), ImmutableMap.of(COLUMN_NAME, 1));
    }
}
