package io.confluent.ksql.api.client;

import io.confluent.ksql.api.BaseApiTest;
import io.confluent.ksql.api.TestQueryPublisher;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.test.util.AssertEventually;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import java.math.BigDecimal;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/client/ClientTest.class */
public class ClientTest extends BaseApiTest {
    protected static final String DEFAULT_PUSH_QUERY_WITH_LIMIT = "select * from foo emit changes limit 10;";
    protected Client javaClient;
    protected static final Logger log = LoggerFactory.getLogger(ClientTest.class);
    protected static final List<String> DEFAULT_COLUMN_NAMES = BaseApiTest.DEFAULT_COLUMN_NAMES.getList();
    protected static final List<ColumnType> DEFAULT_COLUMN_TYPES = RowUtil.columnTypesFromStrings(BaseApiTest.DEFAULT_COLUMN_TYPES.getList());
    protected static final Map<String, Object> DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES = BaseApiTest.DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES.getMap();
    protected static final List<KsqlArray> EXPECTED_ROWS = convertToClientRows(DEFAULT_JSON_ROWS);
    protected static final List<KsqlObject> INSERT_ROWS = generateInsertRows();
    protected static final List<JsonObject> EXPECTED_INSERT_ROWS = convertToJsonRows(INSERT_ROWS);

    public void setUp() {
        super.setUp();
        this.javaClient = createJavaClient();
    }

    protected WebClient createClient() {
        return null;
    }

    protected void stopClient() {
        if (this.javaClient != null) {
            try {
                this.javaClient.close();
            } catch (Exception e) {
                log.error("Failed to close client", e);
            }
        }
    }

    @Test
    public void shouldStreamPushQueryAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        shouldReceiveRows(streamedQueryResult, false);
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        verifyPushQueryServerState("select * from foo emit changes;", queryID);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldStreamPushQuerySync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        verifyPushQueryServerState("select * from foo emit changes;", queryID);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldStreamPullQueryAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo where rowkey='1234';").get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.nullValue()));
        shouldReceiveRows(streamedQueryResult, true);
        verifyPullQueryServerState();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPullQuerySync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo where rowkey='1234';").get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.nullValue()));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
        verifyPullQueryServerState();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPushQueryWithLimitAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        shouldReceiveRows(streamedQueryResult, true);
        verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPushQueryWithLimitSync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
        verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldHandleErrorResponseFromStreamQuery() {
        this.testEndpoints.setCreateQueryPublisherException(new ParseFailedException("invalid query blah", "bad query text"));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("invalid query blah"));
    }

    @Test
    public void shouldFailPollStreamedQueryResultIfSubscribed() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        ClientTestUtil.subscribeAndWait(streamedQueryResult);
        streamedQueryResult.getClass();
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, streamedQueryResult::poll)).getMessage(), Matchers.containsString("Cannot poll if subscriber has been set"));
    }

    @Test
    public void shouldFailSubscribeStreamedQueryResultIfPolling() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        streamedQueryResult.poll(Duration.ofNanos(1L));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            streamedQueryResult.subscribe(new ClientTestUtil.TestSubscriber());
        })).getMessage(), Matchers.containsString("Cannot set subscriber if polling"));
    }

    @Test
    public void shouldFailPollStreamedQueryResultIfFailed() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        sendQueryPublisherError();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            streamedQueryResult.poll();
        })).getMessage(), Matchers.containsString("Cannot poll on StreamedQueryResult that has failed"));
    }

    @Test
    public void shouldReturnFromPollStreamedQueryResultOnError() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            streamedQueryResult.poll();
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        new Thread(() -> {
            MatcherAssert.assertThat(StreamedQueryResultImpl.pollWithCallback(streamedQueryResult, () -> {
                countDownLatch.countDown();
            }), Matchers.is(Matchers.nullValue()));
            countDownLatch2.countDown();
        }).start();
        ClientTestUtil.awaitLatch(countDownLatch);
        sendQueryPublisherError();
        ClientTestUtil.awaitLatch(countDownLatch2);
    }

    @Test
    public void shouldPropagateErrorWhenStreamingFromStreamQuery() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        sendQueryPublisherError();
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getError, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(subscribeAndWait.getError(), Matchers.instanceOf(KsqlException.class));
        MatcherAssert.assertThat(subscribeAndWait.getError().getMessage(), Matchers.containsString("Error in processing query. Check server logs for details."));
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(subscribeAndWait.isCompleted()), Matchers.equalTo(false));
    }

    @Test
    public void shouldDeliverBufferedRowsViaPollIfComplete() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldDeliverBufferedRowsOnErrorIfStreaming() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        sendQueryPublisherError();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(subscribeAndWait.getValues(), Matchers.hasSize(0));
        subscribeAndWait.getSub().request(DEFAULT_JSON_ROWS.size());
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getError, Matchers.is(Matchers.notNullValue()));
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(DEFAULT_JSON_ROWS.size()));
        verifyRows(subscribeAndWait.getValues());
    }

    @Test
    public void shouldFailSubscribeStreamedQueryResultOnError() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        sendQueryPublisherError();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            streamedQueryResult.subscribe(new ClientTestUtil.TestSubscriber());
        })).getMessage(), Matchers.containsString("Cannot subscribe to failed publisher"));
    }

    @Test
    public void shouldAllowSubscribeStreamedQueryResultIfComplete() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        MatcherAssert.assertThat(subscribeAndWait.getValues(), Matchers.hasSize(0));
        subscribeAndWait.getSub().request(DEFAULT_JSON_ROWS.size());
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(DEFAULT_JSON_ROWS.size()));
        verifyRows(subscribeAndWait.getValues());
        MatcherAssert.assertThat(subscribeAndWait.getError(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldExecutePullQuery() throws Exception {
        BatchedQueryResult executeQuery = this.javaClient.executeQuery("select * from foo where rowkey='1234';");
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.nullValue()));
        verifyRows((List) executeQuery.get());
        verifyPullQueryServerState();
    }

    @Test
    public void shouldExecutePushWithLimitQuery() throws Exception {
        BatchedQueryResult executeQuery = this.javaClient.executeQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES);
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.notNullValue()));
        verifyRows((List) executeQuery.get());
        verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
    }

    @Test
    public void shouldHandleErrorResponseFromExecuteQuery() {
        this.testEndpoints.setCreateQueryPublisherException(new ParseFailedException("invalid query blah", "bad query text"));
        BatchedQueryResult executeQuery = this.javaClient.executeQuery("bad query");
        executeQuery.getClass();
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, executeQuery::get);
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("invalid query blah"));
        Exception exc2 = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc2.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("invalid query blah"));
    }

    @Test
    public void shouldTerminatePushQueryIssuedViaStreamQuery() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(1));
        MatcherAssert.assertThat(Boolean.valueOf(this.server.getQueryIDs().contains(new PushQueryId(queryID))), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
        this.javaClient.terminatePushQuery(queryID).get();
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(0));
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldTerminatePushQueryIssuedViaExecuteQuery() throws Exception {
        BatchedQueryResult executeQuery = this.javaClient.executeQuery("select * from foo emit changes;");
        String str = (String) executeQuery.queryID().get();
        MatcherAssert.assertThat(str, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(1));
        MatcherAssert.assertThat(Boolean.valueOf(this.server.getQueryIDs().contains(new PushQueryId(str))), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.isDone()), Matchers.is(false));
        this.javaClient.terminatePushQuery(str).get();
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(0));
        executeQuery.getClass();
        AssertEventually.assertThatEventually(executeQuery::isDone, Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.isCompletedExceptionally()), Matchers.is(false));
    }

    @Test
    public void shouldHandleErrorResponseFromTerminatePushQuery() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("No query with id"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: " + Errors.ERROR_CODE_BAD_REQUEST));
    }

    @Test
    public void shouldInsertInto() throws Exception {
        this.javaClient.insertInto("test-stream", INSERT_ROWS.get(0)).get();
        AssertEventually.assertThatEventually(() -> {
            return this.testEndpoints.getInsertsSubscriber().getRowsInserted();
        }, Matchers.hasSize(1));
        MatcherAssert.assertThat(this.testEndpoints.getInsertsSubscriber().getRowsInserted().get(0), Matchers.is(EXPECTED_INSERT_ROWS.get(0)));
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.testEndpoints.getInsertsSubscriber().isCompleted());
        }, Matchers.is(true));
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.testEndpoints.getInsertsSubscriber().isClosed());
        }, Matchers.is(true));
        MatcherAssert.assertThat(this.testEndpoints.getLastTarget(), Matchers.is("test-stream"));
    }

    @Test
    public void shouldHandleErrorResponseFromInsertInto() {
        this.testEndpoints.setCreateInsertsSubscriberException(new KsqlApiException("Cannot insert into a table", Errors.ERROR_CODE_BAD_REQUEST));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Cannot insert into a table"));
    }

    @Test
    public void shouldHandleErrorFromInsertInto() {
        this.testEndpoints.setAcksBeforePublisherError(0);
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received error from /inserts-stream"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 50000"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Message: Error in processing inserts. Check server logs for details."));
    }

    protected Client createJavaClient() {
        return Client.create(createJavaClientOptions(), this.vertx);
    }

    protected ClientOptions createJavaClientOptions() {
        return ClientOptions.create().setHost("localhost").setPort(((URI) this.server.getListeners().get(0)).getPort());
    }

    private void verifyPushQueryServerState(String str) {
        verifyPushQueryServerState(str, null);
    }

    private void verifyPushQueryServerState(String str, String str2) {
        MatcherAssert.assertThat(this.testEndpoints.getLastSql(), Matchers.is(str));
        MatcherAssert.assertThat(this.testEndpoints.getLastProperties(), Matchers.is(BaseApiTest.DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES));
        if (str2 != null) {
            MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(1));
            MatcherAssert.assertThat(Boolean.valueOf(this.server.getQueryIDs().contains(new PushQueryId(str2))), Matchers.is(true));
        }
    }

    private void verifyPullQueryServerState() {
        MatcherAssert.assertThat(this.testEndpoints.getLastSql(), Matchers.is("select * from foo where rowkey='1234';"));
        MatcherAssert.assertThat(this.testEndpoints.getLastProperties().getMap(), Matchers.is(Collections.emptyMap()));
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(0));
    }

    private void sendQueryPublisherError() {
        Set queryPublishers = this.testEndpoints.getQueryPublishers();
        MatcherAssert.assertThat(queryPublishers, Matchers.hasSize(1));
        ((TestQueryPublisher) queryPublishers.stream().findFirst().get()).sendError();
    }

    private static void shouldReceiveRows(Publisher<Row> publisher, boolean z) {
        ClientTestUtil.shouldReceiveRows(publisher, DEFAULT_JSON_ROWS.size(), ClientTest::verifyRows, z);
    }

    private static void verifyRows(List<Row> list) {
        MatcherAssert.assertThat(list, Matchers.hasSize(DEFAULT_JSON_ROWS.size()));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(list.get(i), i);
        }
    }

    private static void verifyRowWithIndex(Row row, int i) {
        MatcherAssert.assertThat(row.values(), Matchers.equalTo(EXPECTED_ROWS.get(i)));
        MatcherAssert.assertThat(row.columnNames(), Matchers.equalTo(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(row.columnTypes(), Matchers.equalTo(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(row.getString("f_str"), Matchers.is("foo" + i));
        MatcherAssert.assertThat(row.getInteger("f_int"), Matchers.is(Integer.valueOf(i)));
        MatcherAssert.assertThat(row.getBoolean("f_bool"), Matchers.is(Boolean.valueOf(i % 2 == 0)));
        MatcherAssert.assertThat(row.getLong("f_long"), Matchers.is(Long.valueOf(Long.valueOf(i).longValue() * i)));
        MatcherAssert.assertThat(row.getDouble("f_double"), Matchers.is(Double.valueOf(i + 0.1111d)));
        MatcherAssert.assertThat(row.getDecimal("f_decimal"), Matchers.is(BigDecimal.valueOf(i + 0.1d)));
        KsqlArray ksqlArray = row.getKsqlArray("f_array");
        MatcherAssert.assertThat(ksqlArray, Matchers.is(new KsqlArray().add("s" + i).add("t" + i)));
        MatcherAssert.assertThat(ksqlArray.getString(0), Matchers.is("s" + i));
        MatcherAssert.assertThat(ksqlArray.getString(1), Matchers.is("t" + i));
        KsqlObject ksqlObject = row.getKsqlObject("f_map");
        MatcherAssert.assertThat(ksqlObject, Matchers.is(new KsqlObject().put("k" + i, "v" + i)));
        MatcherAssert.assertThat(ksqlObject.getString("k" + i), Matchers.is("v" + i));
        KsqlObject ksqlObject2 = row.getKsqlObject("f_struct");
        MatcherAssert.assertThat(ksqlObject2, Matchers.is(new KsqlObject().put("F1", "v" + i).put("F2", Integer.valueOf(i))));
        MatcherAssert.assertThat(ksqlObject2.getString("F1"), Matchers.is("v" + i));
        MatcherAssert.assertThat(ksqlObject2.getInteger("F2"), Matchers.is(Integer.valueOf(i)));
        MatcherAssert.assertThat(row.getValue("f_null"), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(row.getString(1), Matchers.is(row.getString("f_str")));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("f_null")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("f_bool")), Matchers.is(false));
        Assert.assertThrows(ClassCastException.class, () -> {
            row.getInteger("f_str");
        });
        KsqlArray values = row.values();
        MatcherAssert.assertThat(Integer.valueOf(values.size()), Matchers.is(Integer.valueOf(DEFAULT_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(values.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(values.getString(0), Matchers.is(row.getString("f_str")));
        MatcherAssert.assertThat(values.getInteger(1), Matchers.is(row.getInteger("f_int")));
        MatcherAssert.assertThat(values.getBoolean(2), Matchers.is(row.getBoolean("f_bool")));
        MatcherAssert.assertThat(values.getLong(3), Matchers.is(row.getLong("f_long")));
        MatcherAssert.assertThat(values.getDouble(4), Matchers.is(row.getDouble("f_double")));
        MatcherAssert.assertThat(values.getDecimal(5), Matchers.is(row.getDecimal("f_decimal")));
        MatcherAssert.assertThat(values.getKsqlArray(6), Matchers.is(row.getKsqlArray("f_array")));
        MatcherAssert.assertThat(values.getKsqlObject(7), Matchers.is(row.getKsqlObject("f_map")));
        MatcherAssert.assertThat(values.getKsqlObject(8), Matchers.is(row.getKsqlObject("f_struct")));
        MatcherAssert.assertThat(values.getValue(9), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(values.toJsonString(), Matchers.is(new JsonArray(values.getList()).toString()));
        MatcherAssert.assertThat(values.toString(), Matchers.is(values.toJsonString()));
        KsqlObject asObject = row.asObject();
        MatcherAssert.assertThat(Integer.valueOf(asObject.size()), Matchers.is(Integer.valueOf(DEFAULT_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(asObject.fieldNames(), Matchers.contains(DEFAULT_COLUMN_NAMES.toArray()));
        MatcherAssert.assertThat(asObject.getString("f_str"), Matchers.is(row.getString("f_str")));
        MatcherAssert.assertThat(asObject.getInteger("f_int"), Matchers.is(row.getInteger("f_int")));
        MatcherAssert.assertThat(asObject.getBoolean("f_bool"), Matchers.is(row.getBoolean("f_bool")));
        MatcherAssert.assertThat(asObject.getLong("f_long"), Matchers.is(row.getLong("f_long")));
        MatcherAssert.assertThat(asObject.getDouble("f_double"), Matchers.is(row.getDouble("f_double")));
        MatcherAssert.assertThat(asObject.getDecimal("f_decimal"), Matchers.is(row.getDecimal("f_decimal")));
        MatcherAssert.assertThat(asObject.getKsqlArray("f_array"), Matchers.is(row.getKsqlArray("f_array")));
        MatcherAssert.assertThat(asObject.getKsqlObject("f_map"), Matchers.is(row.getKsqlObject("f_map")));
        MatcherAssert.assertThat(asObject.getKsqlObject("f_struct"), Matchers.is(row.getKsqlObject("f_struct")));
        MatcherAssert.assertThat(asObject.getValue("f_null"), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("f_str")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("f_bad")), Matchers.is(false));
        MatcherAssert.assertThat(asObject.toJsonString(), Matchers.is(new JsonObject(asObject.getMap()).toString()));
        MatcherAssert.assertThat(asObject.toString(), Matchers.is(asObject.toJsonString()));
    }

    private static List<KsqlArray> convertToClientRows(List<JsonArray> list) {
        return (List) list.stream().map(jsonArray -> {
            return new KsqlArray(jsonArray.getList());
        }).collect(Collectors.toList());
    }

    private static List<JsonObject> convertToJsonRows(List<KsqlObject> list) {
        return (List) list.stream().map(ksqlObject -> {
            return new JsonObject(ksqlObject.getMap());
        }).collect(Collectors.toList());
    }

    private static List<KsqlObject> generateInsertRows() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new KsqlObject().put("f_str", "foo" + i).put("f_int", Integer.valueOf(i)).put("f_bool", Boolean.valueOf(i % 2 == 0)).put("f_long", Integer.valueOf(i * i)).put("f_double", Double.valueOf(i + 0.1111d)).put("f_decimal", new BigDecimal(i + 0.1d)).put("f_array", new KsqlArray().add("s" + i).add("t" + i)).put("f_map", new KsqlObject().put("k" + i, "v" + i)).put("f_struct", new KsqlObject().put("F1", "v" + i).put("F2", Integer.valueOf(i))).putNull("f_null"));
        }
        return arrayList;
    }
}
