package io.confluent.ksql.api.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.api.BaseApiTest;
import io.confluent.ksql.api.TestQueryPublisher;
import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.DropConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
import io.confluent.ksql.rest.entity.FieldInfo;
import io.confluent.ksql.rest.entity.FunctionDescriptionList;
import io.confluent.ksql.rest.entity.FunctionNameList;
import io.confluent.ksql.rest.entity.FunctionType;
import io.confluent.ksql.rest.entity.KafkaTopicInfo;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryDescription;
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
import io.confluent.ksql.rest.entity.QueryStatusCount;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.entity.TypeList;
import io.confluent.ksql.schema.ksql.types.SqlBaseType;
import io.confluent.ksql.test.util.AssertEventually;
import io.confluent.ksql.util.KsqlConstants;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import java.math.BigDecimal;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/client/ClientTest.class */
public class ClientTest extends BaseApiTest {
    protected static final String DEFAULT_PUSH_QUERY_WITH_LIMIT = "select * from foo emit changes limit 10;";
    protected static final String EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC = "The ksqlDB server accepted the statement issued via executeStatement(), but the response received is of an unexpected format. ";
    protected static final String EXECUTE_STATEMENT_USAGE_DOC = "The executeStatement() method is only for 'CREATE', 'CREATE ... AS SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS SELECT' statements. ";
    protected Client javaClient;
    protected static final Logger log = LoggerFactory.getLogger(ClientTest.class);
    protected static final List<String> DEFAULT_COLUMN_NAMES = BaseApiTest.DEFAULT_COLUMN_NAMES.getList();
    protected static final List<ColumnType> DEFAULT_COLUMN_TYPES = RowUtil.columnTypesFromStrings(BaseApiTest.DEFAULT_COLUMN_TYPES.getList());
    protected static final Map<String, Object> DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES = BaseApiTest.DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES.getMap();
    protected static final List<KsqlArray> EXPECTED_ROWS = convertToClientRows(DEFAULT_JSON_ROWS);
    protected static final List<KsqlObject> INSERT_ROWS = generateInsertRows();
    protected static final List<JsonObject> EXPECTED_INSERT_ROWS = convertToJsonRows(INSERT_ROWS);

    /* loaded from: input_file:io/confluent/ksql/api/client/ClientTest$LegacyStreamInfo.class */
    private static class LegacyStreamInfo extends KsqlEntity {

        @JsonProperty("name")
        private final String name;

        @JsonProperty("topic")
        private final String topic;

        @JsonProperty("format")
        private final String format;

        public LegacyStreamInfo(String str, String str2, String str3) {
            super("sql text");
            this.name = str;
            this.topic = str2;
            this.format = str3;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/api/client/ClientTest$LegacyStreamsList.class */
    private static class LegacyStreamsList extends KsqlEntity {

        @JsonProperty("streams")
        private final Collection<LegacyStreamInfo> streams;

        public LegacyStreamsList(String str, List<LegacyStreamInfo> list) {
            super(str);
            this.streams = list;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/api/client/ClientTest$LegacyTableInfo.class */
    private static class LegacyTableInfo extends KsqlEntity {

        @JsonProperty("name")
        private final String name;

        @JsonProperty("topic")
        private final String topic;

        @JsonProperty("format")
        private final String format;

        @JsonProperty("isWindowed")
        private final boolean windowed;

        public LegacyTableInfo(String str, String str2, String str3, boolean z) {
            super("sql text");
            this.name = str;
            this.topic = str2;
            this.format = str3;
            this.windowed = z;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/api/client/ClientTest$LegacyTablesList.class */
    private static class LegacyTablesList extends KsqlEntity {

        @JsonProperty("tables")
        private final Collection<LegacyTableInfo> tables;

        public LegacyTablesList(String str, List<LegacyTableInfo> list) {
            super(str);
            this.tables = list;
        }
    }

    public void setUp() {
        super.setUp();
        this.javaClient = createJavaClient();
    }

    protected WebClient createClient() {
        return null;
    }

    protected void stopClient() {
        if (this.javaClient != null) {
            try {
                this.javaClient.close();
            } catch (Exception e) {
                log.error("Failed to close client", e);
            }
        }
    }

    @Test
    public void shouldStreamPushQueryAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        shouldReceiveRows(streamedQueryResult, false);
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        verifyPushQueryServerState("select * from foo emit changes;", queryID);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldStreamPushQuerySync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        verifyPushQueryServerState("select * from foo emit changes;", queryID);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldStreamPullQueryAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo where rowkey='1234';").get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.nullValue()));
        shouldReceiveRows(streamedQueryResult, true);
        verifyPullQueryServerState();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPullQuerySync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo where rowkey='1234';").get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.nullValue()));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
        verifyPullQueryServerState();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPushQueryWithLimitAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        shouldReceiveRows(streamedQueryResult, true);
        verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPushQueryWithLimitSync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
        verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldHandleErrorResponseFromStreamQuery() {
        this.testEndpoints.setCreateQueryPublisherException(new ParseFailedException("invalid query blah", "bad query text"));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("invalid query blah"));
    }

    @Test
    public void shouldFailPollStreamedQueryResultIfSubscribed() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        ClientTestUtil.subscribeAndWait(streamedQueryResult);
        streamedQueryResult.getClass();
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, streamedQueryResult::poll)).getMessage(), Matchers.containsString("Cannot poll if subscriber has been set"));
    }

    @Test
    public void shouldFailSubscribeStreamedQueryResultIfPolling() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        streamedQueryResult.poll(Duration.ofNanos(1L));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            streamedQueryResult.subscribe(new ClientTestUtil.TestSubscriber());
        })).getMessage(), Matchers.containsString("Cannot set subscriber if polling"));
    }

    @Test
    public void shouldFailPollStreamedQueryResultIfFailed() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        sendQueryPublisherError();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            streamedQueryResult.poll();
        })).getMessage(), Matchers.containsString("Cannot poll on StreamedQueryResult that has failed"));
    }

    @Test
    public void shouldReturnFromPollStreamedQueryResultOnError() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            streamedQueryResult.poll();
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        new Thread(() -> {
            MatcherAssert.assertThat(StreamedQueryResultImpl.pollWithCallback(streamedQueryResult, () -> {
                countDownLatch.countDown();
            }), Matchers.is(Matchers.nullValue()));
            countDownLatch2.countDown();
        }).start();
        ClientTestUtil.awaitLatch(countDownLatch);
        sendQueryPublisherError();
        ClientTestUtil.awaitLatch(countDownLatch2);
    }

    @Test
    public void shouldPropagateErrorWhenStreamingFromStreamQuery() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        sendQueryPublisherError();
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getError, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(subscribeAndWait.getError(), Matchers.instanceOf(KsqlException.class));
        MatcherAssert.assertThat(subscribeAndWait.getError().getMessage(), Matchers.containsString("Error in processing query. Check server logs for details."));
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(subscribeAndWait.isCompleted()), Matchers.equalTo(false));
    }

    @Test
    public void shouldDeliverBufferedRowsViaPollIfComplete() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldDeliverBufferedRowsOnErrorIfStreaming() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        sendQueryPublisherError();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(subscribeAndWait.getValues(), Matchers.hasSize(0));
        subscribeAndWait.getSub().request(DEFAULT_JSON_ROWS.size());
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getError, Matchers.is(Matchers.notNullValue()));
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(DEFAULT_JSON_ROWS.size()));
        verifyRows(subscribeAndWait.getValues());
    }

    @Test
    public void shouldFailSubscribeStreamedQueryResultOnError() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        sendQueryPublisherError();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isFailed, Matchers.is(true));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(IllegalStateException.class, () -> {
            streamedQueryResult.subscribe(new ClientTestUtil.TestSubscriber());
        })).getMessage(), Matchers.containsString("Cannot subscribe to failed publisher"));
    }

    @Test
    public void shouldAllowSubscribeStreamedQueryResultIfComplete() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        MatcherAssert.assertThat(subscribeAndWait.getValues(), Matchers.hasSize(0));
        subscribeAndWait.getSub().request(DEFAULT_JSON_ROWS.size());
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(DEFAULT_JSON_ROWS.size()));
        verifyRows(subscribeAndWait.getValues());
        MatcherAssert.assertThat(subscribeAndWait.getError(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldExecutePullQuery() throws Exception {
        BatchedQueryResult executeQuery = this.javaClient.executeQuery("select * from foo where rowkey='1234';");
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.nullValue()));
        verifyRows((List) executeQuery.get());
        verifyPullQueryServerState();
    }

    @Test
    public void shouldExecutePushWithLimitQuery() throws Exception {
        BatchedQueryResult executeQuery = this.javaClient.executeQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES);
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.notNullValue()));
        verifyRows((List) executeQuery.get());
        verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
    }

    @Test
    public void shouldHandleErrorResponseFromExecuteQuery() {
        this.testEndpoints.setCreateQueryPublisherException(new ParseFailedException("invalid query blah", "bad query text"));
        BatchedQueryResult executeQuery = this.javaClient.executeQuery("bad query");
        executeQuery.getClass();
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, executeQuery::get);
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("invalid query blah"));
        Exception exc2 = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc2.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("invalid query blah"));
    }

    @Test
    public void shouldTerminatePushQueryIssuedViaStreamQuery() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.javaClient.streamQuery("select * from foo emit changes;", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(1));
        MatcherAssert.assertThat(Boolean.valueOf(this.server.getQueryIDs().contains(new PushQueryId(queryID))), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
        this.javaClient.terminatePushQuery(queryID).get();
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(0));
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldTerminatePushQueryIssuedViaExecuteQuery() throws Exception {
        BatchedQueryResult executeQuery = this.javaClient.executeQuery("select * from foo emit changes;");
        String str = (String) executeQuery.queryID().get();
        MatcherAssert.assertThat(str, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(1));
        MatcherAssert.assertThat(Boolean.valueOf(this.server.getQueryIDs().contains(new PushQueryId(str))), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.isDone()), Matchers.is(false));
        this.javaClient.terminatePushQuery(str).get();
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(0));
        executeQuery.getClass();
        AssertEventually.assertThatEventually(executeQuery::isDone, Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.isCompletedExceptionally()), Matchers.is(false));
    }

    @Test
    public void shouldHandleErrorResponseFromTerminatePushQuery() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("No query with id"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: " + Errors.ERROR_CODE_BAD_REQUEST));
    }

    @Test
    public void shouldInsertInto() throws Exception {
        this.javaClient.insertInto("test-stream", INSERT_ROWS.get(0)).get();
        AssertEventually.assertThatEventually(() -> {
            return this.testEndpoints.getInsertsSubscriber().getRowsInserted();
        }, Matchers.hasSize(1));
        MatcherAssert.assertThat(this.testEndpoints.getInsertsSubscriber().getRowsInserted().get(0), Matchers.is(EXPECTED_INSERT_ROWS.get(0)));
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.testEndpoints.getInsertsSubscriber().isCompleted());
        }, Matchers.is(true));
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(this.testEndpoints.getInsertsSubscriber().isClosed());
        }, Matchers.is(true));
        MatcherAssert.assertThat(this.testEndpoints.getLastTarget(), Matchers.is("test-stream"));
    }

    @Test
    public void shouldHandleErrorResponseFromInsertInto() {
        this.testEndpoints.setCreateInsertsSubscriberException(new KsqlApiException("Cannot insert into a table", Errors.ERROR_CODE_BAD_REQUEST));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Cannot insert into a table"));
    }

    @Test
    public void shouldHandleErrorFromInsertInto() {
        this.testEndpoints.setAcksBeforePublisherError(0);
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received error from /inserts-stream"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 50000"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Message: Error in processing inserts. Check server logs for details."));
    }

    @Test
    public void shouldStreamInserts() throws Exception {
        InsertsPublisher insertsPublisher = new InsertsPublisher();
        AcksPublisher acksPublisher = (AcksPublisher) this.javaClient.streamInserts("test-stream", insertsPublisher).get();
        Iterator<KsqlObject> it = INSERT_ROWS.iterator();
        while (it.hasNext()) {
            insertsPublisher.accept(it.next());
        }
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(acksPublisher);
        subscribeAndWait.getSub().request(INSERT_ROWS.size());
        AssertEventually.assertThatEventually(() -> {
            return this.testEndpoints.getInsertsSubscriber().getRowsInserted();
        }, Matchers.hasSize(INSERT_ROWS.size()));
        for (int i = 0; i < INSERT_ROWS.size(); i++) {
            MatcherAssert.assertThat(this.testEndpoints.getInsertsSubscriber().getRowsInserted().get(i), Matchers.is(EXPECTED_INSERT_ROWS.get(i)));
        }
        MatcherAssert.assertThat(this.testEndpoints.getLastTarget(), Matchers.is("test-stream"));
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(INSERT_ROWS.size()));
        MatcherAssert.assertThat(subscribeAndWait.getError(), Matchers.is(Matchers.nullValue()));
        for (int i2 = 0; i2 < INSERT_ROWS.size(); i2++) {
            MatcherAssert.assertThat(Long.valueOf(((InsertAck) subscribeAndWait.getValues().get(i2)).seqNum()), Matchers.is(Long.valueOf(i2)));
        }
        MatcherAssert.assertThat(Boolean.valueOf(subscribeAndWait.isCompleted()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(acksPublisher.isComplete()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(acksPublisher.isFailed()), Matchers.is(false));
        insertsPublisher.complete();
        acksPublisher.getClass();
        AssertEventually.assertThatEventually(acksPublisher::isComplete, Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(acksPublisher.isFailed()), Matchers.is(false));
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::isCompleted, Matchers.is(true));
    }

    @Test
    public void shouldHandleErrorResponseFromStreamInserts() {
        this.testEndpoints.setCreateInsertsSubscriberException(new KsqlApiException("Cannot insert into a table", Errors.ERROR_CODE_BAD_REQUEST));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Cannot insert into a table"));
    }

    @Test
    public void shouldHandleErrorFromStreamInserts() throws Exception {
        this.testEndpoints.setAcksBeforePublisherError(INSERT_ROWS.size() - 1);
        InsertsPublisher insertsPublisher = new InsertsPublisher();
        AcksPublisher acksPublisher = (AcksPublisher) this.javaClient.streamInserts("test-stream", insertsPublisher).get();
        for (int i = 0; i < INSERT_ROWS.size(); i++) {
            insertsPublisher.accept(INSERT_ROWS.get(i));
        }
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(acksPublisher);
        subscribeAndWait.getSub().request(INSERT_ROWS.size() - 1);
        AssertEventually.assertThatEventually(() -> {
            return this.testEndpoints.getInsertsSubscriber().getRowsInserted();
        }, Matchers.hasSize(INSERT_ROWS.size()));
        for (int i2 = 0; i2 < INSERT_ROWS.size(); i2++) {
            MatcherAssert.assertThat(this.testEndpoints.getInsertsSubscriber().getRowsInserted().get(i2), Matchers.is(EXPECTED_INSERT_ROWS.get(i2)));
        }
        MatcherAssert.assertThat(this.testEndpoints.getLastTarget(), Matchers.is("test-stream"));
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(INSERT_ROWS.size() - 1));
        for (int i3 = 0; i3 < INSERT_ROWS.size() - 1; i3++) {
            MatcherAssert.assertThat(Long.valueOf(((InsertAck) subscribeAndWait.getValues().get(i3)).seqNum()), Matchers.is(Long.valueOf(i3)));
        }
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getError, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(Boolean.valueOf(acksPublisher.isFailed()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(acksPublisher.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldExecuteStatementWithQueryId() throws Exception {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new CommandStatusEntity("CSAS;", new CommandId("STREAM", "FOO", "CREATE"), new CommandStatus(CommandStatus.Status.SUCCESS, "Success", Optional.of(new QueryId("CSAS_0"))), 0L)));
        ExecuteStatementResult executeStatementResult = (ExecuteStatementResult) this.javaClient.executeStatement("CSAS;", ImmutableMap.of("auto.offset.reset", "earliest")).get();
        MatcherAssert.assertThat(this.testEndpoints.getLastSql(), Matchers.is("CSAS;"));
        MatcherAssert.assertThat(this.testEndpoints.getLastProperties(), Matchers.is(new JsonObject().put("auto.offset.reset", "earliest")));
        MatcherAssert.assertThat(executeStatementResult.queryId(), Matchers.is(Optional.of("CSAS_0")));
    }

    @Test
    public void shouldExecuteStatementWithoutQueryId() throws Exception {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new CommandStatusEntity("CSAS;", new CommandId("STREAM", "FOO", "CREATE"), new CommandStatus(CommandStatus.Status.SUCCESS, "Success"), 0L)));
        ExecuteStatementResult executeStatementResult = (ExecuteStatementResult) this.javaClient.executeStatement("CSAS;", ImmutableMap.of("auto.offset.reset", "earliest")).get();
        MatcherAssert.assertThat(this.testEndpoints.getLastSql(), Matchers.is("CSAS;"));
        MatcherAssert.assertThat(this.testEndpoints.getLastProperties(), Matchers.is(new JsonObject().put("auto.offset.reset", "earliest")));
        MatcherAssert.assertThat(executeStatementResult.queryId(), Matchers.is(Optional.empty()));
    }

    @Test
    public void shouldHandleErrorResponseFromExecuteStatement() {
        this.testEndpoints.setExecuteKsqlRequestException(new io.confluent.ksql.util.KsqlException("something bad"));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 500 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("something bad"));
    }

    @Test
    public void shouldRejectMultipleRequestsFromExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("executeStatement() may only be used to execute one statement at a time"));
    }

    @Test
    public void shouldRejectRequestWithMissingSemicolonFromExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Missing semicolon in SQL for executeStatement() request"));
    }

    @Test
    public void shouldFailOnNoEntitiesFromExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.emptyList());
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
    }

    @Test
    public void shouldFailToListStreamsViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new StreamsList("list streams;", Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Use the listStreams() method instead"));
    }

    @Test
    public void shouldFailToListTablesViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new TablesList("list tables;", Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Use the listTables() method instead"));
    }

    @Test
    public void shouldFailToListTopicsViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new KafkaTopicsList("list topics;", Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Use the listTopics() method instead"));
    }

    @Test
    public void shouldFailToListQueriesViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new Queries("list queries;", Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Use the listQueries() method instead"));
    }

    @Test
    public void shouldFailToDescribeSourceViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new SourceDescriptionEntity("describe source;", new SourceDescription("name", Optional.empty(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), "type", "timestamp", "statistics", "errorStats", false, "keyFormat", "valueFormat", "topic", 4, 1, "statement", Collections.emptyList()), Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'DESCRIBE <STREAM/TABLE>' statements"));
    }

    @Test
    public void shouldFailToListFunctionsViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new FunctionNameList("list functions;", Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'DESCRIBE <FUNCTION>' statements or listing functions"));
    }

    @Test
    public void shouldFailToDescribeFunctionViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new FunctionDescriptionList("describe function;", "SUM", "sum", "Confluent", "version", "path", Collections.emptyList(), FunctionType.AGGREGATE)));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'DESCRIBE <FUNCTION>' statements or listing functions"));
    }

    @Test
    public void shouldFailToExplainQueryViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new QueryDescriptionEntity("explain query;", new QueryDescription(new QueryId("id"), "sql", Optional.empty(), Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), "topology", "executionPlan", Collections.emptyMap(), Collections.emptyMap(), KsqlConstants.KsqlQueryType.PERSISTENT, Collections.emptyList()))));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'EXPLAIN <QUERY_ID>' statements"));
    }

    @Test
    public void shouldFailToListPropertiesViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new PropertiesList("list properties;", Collections.emptyList(), Collections.emptyList(), Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support listing properties"));
    }

    @Test
    public void shouldFailToListTypesViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new TypeList("list types;", Collections.emptyMap())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support listing custom types"));
    }

    @Test
    public void shouldFailToListConnectorsViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new ConnectorList("list connectors;", Collections.emptyList(), Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support listing connectors"));
    }

    @Test
    public void shouldFailToDescribeConnectorViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new ConnectorDescription("describe connector;", "connectorClass", new ConnectorStateInfo("name", new ConnectorStateInfo.ConnectorState("state", "worker", "msg"), Collections.emptyList(), ConnectorType.SOURCE), Collections.emptyList(), Collections.singletonList("topic"), Collections.emptyList())));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'DESCRIBE <CONNECTOR>' statements"));
    }

    @Test
    public void shouldFailToCreateConnectorViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new CreateConnectorEntity("create connector;", new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE))));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'CREATE CONNECTOR' statements"));
    }

    @Test
    public void shouldFailToDropConnectorViaExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new DropConnectorEntity("drop connector;", "name")));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support 'DROP CONNECTOR' statements"));
    }

    @Test
    public void shouldFailOnErrorEntityFromExecuteStatement() {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new ErrorEntity("create connector;", "error msg")));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("does not currently support statements for creating, dropping, listing, or describing connectors"));
    }

    @Test
    public void shouldListStreams() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SourceInfo.Stream("stream1", "topic1", "KAFKA", "JSON", true));
        arrayList.add(new SourceInfo.Stream("stream2", "topic2", "JSON", "AVRO", false));
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new StreamsList("list streams;", arrayList)));
        List list = (List) this.javaClient.listStreams().get();
        MatcherAssert.assertThat(list, Matchers.hasSize(arrayList.size()));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getName(), Matchers.is("stream1"));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getTopic(), Matchers.is("topic1"));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getKeyFormat(), Matchers.is("KAFKA"));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getValueFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(Boolean.valueOf(((StreamInfo) list.get(0)).isWindowed()), Matchers.is(true));
        MatcherAssert.assertThat(((StreamInfo) list.get(1)).getName(), Matchers.is("stream2"));
        MatcherAssert.assertThat(((StreamInfo) list.get(1)).getTopic(), Matchers.is("topic2"));
        MatcherAssert.assertThat(((StreamInfo) list.get(1)).getKeyFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(((StreamInfo) list.get(1)).getValueFormat(), Matchers.is("AVRO"));
        MatcherAssert.assertThat(Boolean.valueOf(((StreamInfo) list.get(1)).isWindowed()), Matchers.is(false));
    }

    @Test
    public void shouldListTables() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SourceInfo.Table("table1", "topic1", "KAFKA", "JSON", true));
        arrayList.add(new SourceInfo.Table("table2", "topic2", "JSON", "AVRO", false));
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new TablesList("list tables;", arrayList)));
        List list = (List) this.javaClient.listTables().get();
        MatcherAssert.assertThat(list, Matchers.hasSize(arrayList.size()));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getName(), Matchers.is("table1"));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getTopic(), Matchers.is("topic1"));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getKeyFormat(), Matchers.is("KAFKA"));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getValueFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(Boolean.valueOf(((TableInfo) list.get(0)).isWindowed()), Matchers.is(true));
        MatcherAssert.assertThat(((TableInfo) list.get(1)).getName(), Matchers.is("table2"));
        MatcherAssert.assertThat(((TableInfo) list.get(1)).getTopic(), Matchers.is("topic2"));
        MatcherAssert.assertThat(((TableInfo) list.get(1)).getKeyFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(((TableInfo) list.get(1)).getValueFormat(), Matchers.is("AVRO"));
        MatcherAssert.assertThat(Boolean.valueOf(((TableInfo) list.get(1)).isWindowed()), Matchers.is(false));
    }

    @Test
    public void shouldListStreamsFromOldServer() throws Exception {
        ImmutableList of = ImmutableList.of(new LegacyStreamInfo("stream1", "topic1", "JSON"));
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new LegacyStreamsList("list streams;", of)));
        List list = (List) this.javaClient.listStreams().get();
        MatcherAssert.assertThat(list, Matchers.hasSize(of.size()));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getName(), Matchers.is("stream1"));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getTopic(), Matchers.is("topic1"));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getKeyFormat(), Matchers.is("KAFKA"));
        MatcherAssert.assertThat(((StreamInfo) list.get(0)).getValueFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(Boolean.valueOf(((StreamInfo) list.get(0)).isWindowed()), Matchers.is(false));
    }

    @Test
    public void shouldListTablesFromOldServer() throws Exception {
        ImmutableList of = ImmutableList.of(new LegacyTableInfo("table1", "topic1", "JSON", true));
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new LegacyTablesList("list tables;", of)));
        List list = (List) this.javaClient.listTables().get();
        MatcherAssert.assertThat(list, Matchers.hasSize(of.size()));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getName(), Matchers.is("table1"));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getTopic(), Matchers.is("topic1"));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getKeyFormat(), Matchers.is("KAFKA"));
        MatcherAssert.assertThat(((TableInfo) list.get(0)).getValueFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(Boolean.valueOf(((TableInfo) list.get(0)).isWindowed()), Matchers.is(true));
    }

    @Test
    public void shouldListTopics() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KafkaTopicInfo("topic1", ImmutableList.of(2, 2, 2)));
        arrayList.add(new KafkaTopicInfo("topic2", ImmutableList.of(1, 1)));
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new KafkaTopicsList("list topics;", arrayList)));
        List list = (List) this.javaClient.listTopics().get();
        MatcherAssert.assertThat(list, Matchers.hasSize(arrayList.size()));
        MatcherAssert.assertThat(((TopicInfo) list.get(0)).getName(), Matchers.is("topic1"));
        MatcherAssert.assertThat(Integer.valueOf(((TopicInfo) list.get(0)).getPartitions()), Matchers.is(3));
        MatcherAssert.assertThat(((TopicInfo) list.get(0)).getReplicasPerPartition(), Matchers.is(ImmutableList.of(2, 2, 2)));
        MatcherAssert.assertThat(((TopicInfo) list.get(1)).getName(), Matchers.is("topic2"));
        MatcherAssert.assertThat(Integer.valueOf(((TopicInfo) list.get(1)).getPartitions()), Matchers.is(2));
        MatcherAssert.assertThat(((TopicInfo) list.get(1)).getReplicasPerPartition(), Matchers.is(ImmutableList.of(1, 1)));
    }

    @Test
    public void shouldHandleErrorFromListTopics() {
        this.testEndpoints.setExecuteKsqlRequestException(new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", new RuntimeException("boom")));
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 500 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Failed to retrieve Kafka Topic names"));
    }

    @Test
    public void shouldListQueries() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RunningQuery("sql1", ImmutableSet.of("sink"), ImmutableSet.of("sink_topic"), new QueryId("a_persistent_query"), new QueryStatusCount(ImmutableMap.of(KsqlConstants.KsqlQueryStatus.RUNNING, 1)), KsqlConstants.KsqlQueryType.PERSISTENT));
        arrayList.add(new RunningQuery("sql2", Collections.emptySet(), Collections.emptySet(), new QueryId("a_push_query"), new QueryStatusCount(), KsqlConstants.KsqlQueryType.PUSH));
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new Queries("list queries;", arrayList)));
        List list = (List) this.javaClient.listQueries().get();
        MatcherAssert.assertThat(list, Matchers.hasSize(arrayList.size()));
        MatcherAssert.assertThat(((QueryInfo) list.get(0)).getQueryType(), Matchers.is(QueryInfo.QueryType.PERSISTENT));
        MatcherAssert.assertThat(((QueryInfo) list.get(0)).getId(), Matchers.is("a_persistent_query"));
        MatcherAssert.assertThat(((QueryInfo) list.get(0)).getSql(), Matchers.is("sql1"));
        MatcherAssert.assertThat(((QueryInfo) list.get(0)).getSink(), Matchers.is(Optional.of("sink")));
        MatcherAssert.assertThat(((QueryInfo) list.get(0)).getSinkTopic(), Matchers.is(Optional.of("sink_topic")));
        MatcherAssert.assertThat(((QueryInfo) list.get(1)).getQueryType(), Matchers.is(QueryInfo.QueryType.PUSH));
        MatcherAssert.assertThat(((QueryInfo) list.get(1)).getId(), Matchers.is("a_push_query"));
        MatcherAssert.assertThat(((QueryInfo) list.get(1)).getSql(), Matchers.is("sql2"));
        MatcherAssert.assertThat(((QueryInfo) list.get(1)).getSink(), Matchers.is(Optional.empty()));
        MatcherAssert.assertThat(((QueryInfo) list.get(1)).getSinkTopic(), Matchers.is(Optional.empty()));
    }

    @Test
    public void shouldDescribeSource() throws Exception {
        this.testEndpoints.setKsqlEndpointResponse(Collections.singletonList(new SourceDescriptionEntity("describe source;", new SourceDescription("name", Optional.of(WindowType.TUMBLING), Collections.singletonList(new RunningQuery("query_sql", ImmutableSet.of("sink"), ImmutableSet.of("sink_topic"), new QueryId("a_persistent_query"), new QueryStatusCount(ImmutableMap.of(KsqlConstants.KsqlQueryStatus.RUNNING, 1)), KsqlConstants.KsqlQueryType.PERSISTENT)), Collections.emptyList(), ImmutableList.of(new FieldInfo("f1", new SchemaInfo(SqlBaseType.STRING, (List) null, (SchemaInfo) null), Optional.of(FieldInfo.FieldType.KEY)), new FieldInfo("f2", new SchemaInfo(SqlBaseType.INTEGER, (List) null, (SchemaInfo) null), Optional.empty())), "TABLE", "", "", "", false, "KAFKA", "JSON", "topic", 4, 1, "sql", Collections.emptyList()), Collections.emptyList())));
        SourceDescription sourceDescription = (SourceDescription) this.javaClient.describeSource("source").get();
        MatcherAssert.assertThat(sourceDescription.name(), Matchers.is("name"));
        MatcherAssert.assertThat(sourceDescription.type(), Matchers.is("TABLE"));
        MatcherAssert.assertThat(sourceDescription.fields(), Matchers.hasSize(2));
        MatcherAssert.assertThat(((FieldInfo) sourceDescription.fields().get(0)).name(), Matchers.is("f1"));
        MatcherAssert.assertThat(((FieldInfo) sourceDescription.fields().get(0)).type().getType(), Matchers.is(ColumnType.Type.STRING));
        MatcherAssert.assertThat(Boolean.valueOf(((FieldInfo) sourceDescription.fields().get(0)).isKey()), Matchers.is(true));
        MatcherAssert.assertThat(((FieldInfo) sourceDescription.fields().get(1)).name(), Matchers.is("f2"));
        MatcherAssert.assertThat(((FieldInfo) sourceDescription.fields().get(1)).type().getType(), Matchers.is(ColumnType.Type.INTEGER));
        MatcherAssert.assertThat(Boolean.valueOf(((FieldInfo) sourceDescription.fields().get(1)).isKey()), Matchers.is(false));
        MatcherAssert.assertThat(sourceDescription.topic(), Matchers.is("topic"));
        MatcherAssert.assertThat(sourceDescription.keyFormat(), Matchers.is("KAFKA"));
        MatcherAssert.assertThat(sourceDescription.valueFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(sourceDescription.readQueries(), Matchers.hasSize(1));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getQueryType(), Matchers.is(QueryInfo.QueryType.PERSISTENT));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getId(), Matchers.is("a_persistent_query"));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getSql(), Matchers.is("query_sql"));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getSink(), Matchers.is(Optional.of("sink")));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getSinkTopic(), Matchers.is(Optional.of("sink_topic")));
        MatcherAssert.assertThat(sourceDescription.writeQueries(), Matchers.hasSize(0));
        MatcherAssert.assertThat(sourceDescription.timestampColumn(), Matchers.is(Optional.empty()));
        MatcherAssert.assertThat(sourceDescription.windowType(), Matchers.is(Optional.of("TUMBLING")));
        MatcherAssert.assertThat(sourceDescription.sqlStatement(), Matchers.is("sql"));
    }

    protected Client createJavaClient() {
        return Client.create(createJavaClientOptions(), this.vertx);
    }

    protected ClientOptions createJavaClientOptions() {
        return ClientOptions.create().setHost("localhost").setPort(((URI) this.server.getListeners().get(0)).getPort());
    }

    private void verifyPushQueryServerState(String str) {
        verifyPushQueryServerState(str, null);
    }

    private void verifyPushQueryServerState(String str, String str2) {
        MatcherAssert.assertThat(this.testEndpoints.getLastSql(), Matchers.is(str));
        MatcherAssert.assertThat(this.testEndpoints.getLastProperties(), Matchers.is(BaseApiTest.DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES));
        if (str2 != null) {
            MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(1));
            MatcherAssert.assertThat(Boolean.valueOf(this.server.getQueryIDs().contains(new PushQueryId(str2))), Matchers.is(true));
        }
    }

    private void verifyPullQueryServerState() {
        MatcherAssert.assertThat(this.testEndpoints.getLastSql(), Matchers.is("select * from foo where rowkey='1234';"));
        MatcherAssert.assertThat(this.testEndpoints.getLastProperties().getMap(), Matchers.is(Collections.emptyMap()));
        MatcherAssert.assertThat(this.server.getQueryIDs(), Matchers.hasSize(0));
    }

    private void sendQueryPublisherError() {
        Set queryPublishers = this.testEndpoints.getQueryPublishers();
        MatcherAssert.assertThat(queryPublishers, Matchers.hasSize(1));
        ((TestQueryPublisher) queryPublishers.stream().findFirst().get()).sendError();
    }

    private static void shouldReceiveRows(Publisher<Row> publisher, boolean z) {
        ClientTestUtil.shouldReceiveRows(publisher, DEFAULT_JSON_ROWS.size(), ClientTest::verifyRows, z);
    }

    private static void verifyRows(List<Row> list) {
        MatcherAssert.assertThat(list, Matchers.hasSize(DEFAULT_JSON_ROWS.size()));
        for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
            verifyRowWithIndex(list.get(i), i);
        }
    }

    private static void verifyRowWithIndex(Row row, int i) {
        MatcherAssert.assertThat(row.values(), Matchers.equalTo(EXPECTED_ROWS.get(i)));
        MatcherAssert.assertThat(row.columnNames(), Matchers.equalTo(DEFAULT_COLUMN_NAMES));
        MatcherAssert.assertThat(row.columnTypes(), Matchers.equalTo(DEFAULT_COLUMN_TYPES));
        MatcherAssert.assertThat(row.getString("f_str"), Matchers.is("foo" + i));
        MatcherAssert.assertThat(row.getInteger("f_int"), Matchers.is(Integer.valueOf(i)));
        MatcherAssert.assertThat(row.getBoolean("f_bool"), Matchers.is(Boolean.valueOf(i % 2 == 0)));
        MatcherAssert.assertThat(row.getLong("f_long"), Matchers.is(Long.valueOf(Long.valueOf(i).longValue() * i)));
        MatcherAssert.assertThat(row.getDouble("f_double"), Matchers.is(Double.valueOf(i + 0.1111d)));
        MatcherAssert.assertThat(row.getDecimal("f_decimal"), Matchers.is(BigDecimal.valueOf(i + 0.1d)));
        KsqlArray ksqlArray = row.getKsqlArray("f_array");
        MatcherAssert.assertThat(ksqlArray, Matchers.is(new KsqlArray().add("s" + i).add("t" + i)));
        MatcherAssert.assertThat(ksqlArray.getString(0), Matchers.is("s" + i));
        MatcherAssert.assertThat(ksqlArray.getString(1), Matchers.is("t" + i));
        KsqlObject ksqlObject = row.getKsqlObject("f_map");
        MatcherAssert.assertThat(ksqlObject, Matchers.is(new KsqlObject().put("k" + i, "v" + i)));
        MatcherAssert.assertThat(ksqlObject.getString("k" + i), Matchers.is("v" + i));
        KsqlObject ksqlObject2 = row.getKsqlObject("f_struct");
        MatcherAssert.assertThat(ksqlObject2, Matchers.is(new KsqlObject().put("F1", "v" + i).put("F2", Integer.valueOf(i))));
        MatcherAssert.assertThat(ksqlObject2.getString("F1"), Matchers.is("v" + i));
        MatcherAssert.assertThat(ksqlObject2.getInteger("F2"), Matchers.is(Integer.valueOf(i)));
        MatcherAssert.assertThat(row.getValue("f_null"), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(row.getString(1), Matchers.is(row.getString("f_str")));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("f_null")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("f_bool")), Matchers.is(false));
        Assert.assertThrows(ClassCastException.class, () -> {
            row.getInteger("f_str");
        });
        KsqlArray values = row.values();
        MatcherAssert.assertThat(Integer.valueOf(values.size()), Matchers.is(Integer.valueOf(DEFAULT_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(values.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(values.getString(0), Matchers.is(row.getString("f_str")));
        MatcherAssert.assertThat(values.getInteger(1), Matchers.is(row.getInteger("f_int")));
        MatcherAssert.assertThat(values.getBoolean(2), Matchers.is(row.getBoolean("f_bool")));
        MatcherAssert.assertThat(values.getLong(3), Matchers.is(row.getLong("f_long")));
        MatcherAssert.assertThat(values.getDouble(4), Matchers.is(row.getDouble("f_double")));
        MatcherAssert.assertThat(values.getDecimal(5), Matchers.is(row.getDecimal("f_decimal")));
        MatcherAssert.assertThat(values.getKsqlArray(6), Matchers.is(row.getKsqlArray("f_array")));
        MatcherAssert.assertThat(values.getKsqlObject(7), Matchers.is(row.getKsqlObject("f_map")));
        MatcherAssert.assertThat(values.getKsqlObject(8), Matchers.is(row.getKsqlObject("f_struct")));
        MatcherAssert.assertThat(values.getValue(9), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(values.toJsonString(), Matchers.is(new JsonArray(values.getList()).toString()));
        MatcherAssert.assertThat(values.toString(), Matchers.is(values.toJsonString()));
        KsqlObject asObject = row.asObject();
        MatcherAssert.assertThat(Integer.valueOf(asObject.size()), Matchers.is(Integer.valueOf(DEFAULT_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(asObject.fieldNames(), Matchers.contains(DEFAULT_COLUMN_NAMES.toArray()));
        MatcherAssert.assertThat(asObject.getString("f_str"), Matchers.is(row.getString("f_str")));
        MatcherAssert.assertThat(asObject.getInteger("f_int"), Matchers.is(row.getInteger("f_int")));
        MatcherAssert.assertThat(asObject.getBoolean("f_bool"), Matchers.is(row.getBoolean("f_bool")));
        MatcherAssert.assertThat(asObject.getLong("f_long"), Matchers.is(row.getLong("f_long")));
        MatcherAssert.assertThat(asObject.getDouble("f_double"), Matchers.is(row.getDouble("f_double")));
        MatcherAssert.assertThat(asObject.getDecimal("f_decimal"), Matchers.is(row.getDecimal("f_decimal")));
        MatcherAssert.assertThat(asObject.getKsqlArray("f_array"), Matchers.is(row.getKsqlArray("f_array")));
        MatcherAssert.assertThat(asObject.getKsqlObject("f_map"), Matchers.is(row.getKsqlObject("f_map")));
        MatcherAssert.assertThat(asObject.getKsqlObject("f_struct"), Matchers.is(row.getKsqlObject("f_struct")));
        MatcherAssert.assertThat(asObject.getValue("f_null"), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("f_str")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("f_bad")), Matchers.is(false));
        MatcherAssert.assertThat(asObject.toJsonString(), Matchers.is(new JsonObject(asObject.getMap()).toString()));
        MatcherAssert.assertThat(asObject.toString(), Matchers.is(asObject.toJsonString()));
    }

    private static List<KsqlArray> convertToClientRows(List<JsonArray> list) {
        return (List) list.stream().map(jsonArray -> {
            return new KsqlArray(jsonArray.getList());
        }).collect(Collectors.toList());
    }

    private static List<JsonObject> convertToJsonRows(List<KsqlObject> list) {
        return (List) list.stream().map(ksqlObject -> {
            return new JsonObject(ksqlObject.getMap());
        }).collect(Collectors.toList());
    }

    private static List<KsqlObject> generateInsertRows() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new KsqlObject().put("f_str", "foo" + i).put("f_int", Integer.valueOf(i)).put("f_bool", Boolean.valueOf(i % 2 == 0)).put("f_long", Integer.valueOf(i * i)).put("f_double", Double.valueOf(i + 0.1111d)).put("f_decimal", new BigDecimal(i + 0.1d)).put("f_array", new KsqlArray().add("s" + i).add("t" + i)).put("f_map", new KsqlObject().put("k" + i, "v" + i)).put("f_struct", new KsqlObject().put("F1", "v" + i).put("F2", Integer.valueOf(i))).putNull("f_null"));
        }
        return arrayList;
    }
}
