package io.confluent.ksql.api.client.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.confluent.ksql.api.client.ConnectorType;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.FieldInfo;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.impl.ConnectorTypeImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.ConnectExecutable;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlTimeTypes;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.test.util.AssertEventually;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.confluent.ksql.util.TestDataProvider;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.math.BigDecimal;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.reactivestreams.Publisher;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/ksql/api/client/integration/ClientIntegrationTest.class */
public class ClientIntegrationTest {
    private static final String AGG_TABLE = "AGG_TABLE";
    private static final String AN_AGG_KEY = "STRUCT(F1 := ARRAY['a'])";
    private static final String PULL_QUERY_ON_TABLE = "SELECT * from AGG_TABLE WHERE K=STRUCT(F1 := ARRAY['a']);";
    protected static final String EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC = "The ksqlDB server accepted the statement issued via executeStatement(), but the response received is of an unexpected format. ";
    protected static final String EXECUTE_STATEMENT_USAGE_DOC = "The executeStatement() method is only for 'CREATE', 'CREATE ... AS SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS SELECT' statements. ";
    private static final String TEST_CONNECTOR = "TEST_CONNECTOR";
    private static final String MOCK_SOURCE_CLASS = "org.apache.kafka.connect.tools.MockSourceConnector";
    private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5;
    private static final int WORKER_POOL_SIZE = 10;
    private static final TestKsqlRestApp REST_APP;

    @ClassRule
    public static final RuleChain CHAIN;
    private static ConnectExecutable CONNECT;
    private Vertx vertx;
    private Client client;
    private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = new StructuredTypesDataProvider();
    private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName();
    private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName();
    private static final int TEST_NUM_ROWS = TEST_DATA_PROVIDER.data().size();
    private static final List<String> TEST_COLUMN_NAMES = ImmutableList.of("K", "STR", "LONG", "DEC", "BYTES_", "ARRAY", "MAP", "STRUCT", "COMPLEX", "TIMESTAMP", "DATE", "TIME", new String[]{"HEAD"});
    private static final List<ColumnType> TEST_COLUMN_TYPES = RowUtil.columnTypesFromStrings(ImmutableList.of("STRUCT", "STRING", "BIGINT", "DECIMAL", "BYTES", "ARRAY", "MAP", "STRUCT", "STRUCT", "TIMESTAMP", "DATE", "TIME", new String[]{"BYTES"}));
    private static final List<KsqlArray> TEST_EXPECTED_ROWS = convertToClientRows(TEST_DATA_PROVIDER.data());
    private static final Format KEY_FORMAT = FormatFactory.JSON;
    private static final Format VALUE_FORMAT = FormatFactory.JSON;
    private static final Supplier<Long> TS_SUPPLIER = () -> {
        return 0L;
    };
    private static final Supplier<List<Header>> HEADERS_SUPPLIER = () -> {
        return ImmutableList.of(new RecordHeader("h0", new byte[]{23}));
    };
    private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from(LogicalSchema.builder().keyColumn(ColumnName.of("K"), SqlTypes.struct().field("F1", SqlTypes.array(SqlTypes.STRING)).build()).valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT).build(), SerdeFeatures.of(new SerdeFeature[]{SerdeFeature.UNWRAP_SINGLES}), SerdeFeatures.of(new SerdeFeature[0]));
    private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER = new TestDataProvider("EMPTY_STRUCTURED_TYPES", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of());
    private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName();
    private static final String EMPTY_TEST_STREAM = EMPTY_TEST_DATA_PROVIDER.sourceName();
    private static final TestDataProvider TRUNCATED_TEST_DATA_PROVIDER = new StructuredTypesDataProvider("TRUNCATED_STRUCTURED_TYPES");
    private static final String TRUNCATED_TEST_TOPIC = TRUNCATED_TEST_DATA_PROVIDER.topicName();
    private static final String TRUNCATED_TEST_STREAM = TRUNCATED_TEST_DATA_PROVIDER.sourceName();
    private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;";
    private static final String PULL_QUERY_ON_STREAM = "SELECT * FROM " + TEST_STREAM + ";";
    private static final int PUSH_QUERY_LIMIT_NUM_ROWS = 2;
    private static final String PUSH_QUERY_WITH_LIMIT = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES LIMIT " + PUSH_QUERY_LIMIT_NUM_ROWS + ";";
    private static final List<String> PULL_QUERY_COLUMN_NAMES = ImmutableList.of("K", "LONG");
    private static final List<ColumnType> PULL_QUERY_COLUMN_TYPES = RowUtil.columnTypesFromStrings(ImmutableList.of("STRUCT", "BIGINT"));
    private static final int EVENT_LOOP_POOL_SIZE = 1;
    private static final KsqlArray PULL_QUERY_EXPECTED_ROW = new KsqlArray().add(new KsqlObject().put("F1", new KsqlArray().add("a"))).add(Integer.valueOf(EVENT_LOOP_POOL_SIZE));
    private static final ConnectorType SOURCE_TYPE = new ConnectorTypeImpl("SOURCE");
    private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

    @BeforeClass
    public static void setUpClass() throws Exception {
        TEST_HARNESS.ensureTopics(new String[]{TEST_TOPIC, EMPTY_TEST_TOPIC, TRUNCATED_TEST_TOPIC});
        TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT, TS_SUPPLIER, HEADERS_SUPPLIER);
        RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER);
        TEST_HARNESS.produceRows(TRUNCATED_TEST_TOPIC, TRUNCATED_TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT, TS_SUPPLIER, HEADERS_SUPPLIER);
        truncateTopic(TRUNCATED_TEST_TOPIC);
        RestIntegrationTestUtil.createStream(REST_APP, TRUNCATED_TEST_DATA_PROVIDER);
        RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER);
        makeKsqlRequest("CREATE TABLE AGG_TABLE AS SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;");
        TEST_HARNESS.verifyAvailableUniqueRows(AGG_TABLE, 4, KEY_FORMAT, VALUE_FORMAT, AGG_SCHEMA);
        String path = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString();
        String path2 = Paths.get(path, "connect.properties").toString();
        Files.createDirectories(Paths.get(path, new String[0]), new FileAttribute[0]);
        writeConnectConfigs(path2, ImmutableMap.builder().put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers()).put("group.id", UUID.randomUUID().toString()).put("key.converter", StringConverter.class.getName()).put("value.converter", JsonConverter.class.getName()).put("offset.storage.topic", "connect-offsets").put("status.storage.topic", "connect-status").put("config.storage.topic", "connect-config").put("offset.storage.replication.factor", "1").put("status.storage.replication.factor", "1").put("config.storage.replication.factor", "1").put("value.converter.schemas.enable", "false").build());
        CONNECT = ConnectExecutable.of(path2);
        CONNECT.startAsync();
    }

    private static void truncateTopic(String str) throws InterruptedException, ExecutionException {
        Admin create = Admin.create(TEST_HARNESS.getKafkaCluster().getClientProperties());
        Throwable th = null;
        try {
            TopicDescription topicDescription = (TopicDescription) ((Map) create.describeTopics(Collections.singleton(str)).allTopicNames().get()).get(str);
            Map map = (Map) create.listOffsets((Map) topicDescription.partitions().stream().map(topicPartitionInfo -> {
                return new TopicPartition(str, topicPartitionInfo.partition());
            }).collect(Collectors.toMap(topicPartition -> {
                return topicPartition;
            }, topicPartition2 -> {
                return OffsetSpec.latest();
            }))).all().get();
            create.deleteRecords((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return RecordsToDelete.beforeOffset(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
            }))).all().get();
            Map map2 = (Map) create.listOffsets((Map) topicDescription.partitions().stream().map(topicPartitionInfo2 -> {
                return new TopicPartition(str, topicPartitionInfo2.partition());
            }).collect(Collectors.toMap(topicPartition3 -> {
                return topicPartition3;
            }, topicPartition4 -> {
                return OffsetSpec.earliest();
            }))).all().get();
            UnmodifiableIterator it = Sets.union(map2.keySet(), map.keySet()).iterator();
            while (it.hasNext()) {
                TopicPartition topicPartition5 = (TopicPartition) it.next();
                MatcherAssert.assertThat(map.get(topicPartition5), Matchers.notNullValue());
                MatcherAssert.assertThat(map2.get(topicPartition5), Matchers.notNullValue());
                MatcherAssert.assertThat(Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) map2.get(topicPartition5)).offset()), Matchers.is(Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) map.get(topicPartition5)).offset())));
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private static void writeConnectConfigs(String str, Map<String, String> map) throws Exception {
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(str, true), StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            try {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    printWriter.println(entry.getKey() + "=" + entry.getValue());
                }
                if (printWriter != null) {
                    if (0 == 0) {
                        printWriter.close();
                        return;
                    }
                    try {
                        printWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (printWriter != null) {
                if (th != null) {
                    try {
                        printWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th4;
        }
    }

    @AfterClass
    public static void classTearDown() {
        cleanupConnectors();
        CONNECT.shutdown();
        REST_APP.getPersistentQueries().forEach(str -> {
            makeKsqlRequest("TERMINATE " + str + ";");
        });
    }

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.client = createClient();
    }

    @After
    public void tearDown() {
        if (this.client != null) {
            this.client.close();
        }
        if (this.vertx != null) {
            this.vertx.close();
        }
        REST_APP.getServiceContext().close();
    }

    @Test
    public void shouldStreamMultiplePushQueries() throws Exception {
        StreamedQueryResult[] streamedQueryResultArr = new StreamedQueryResult[NUM_CONCURRENT_REQUESTS_TO_TEST];
        for (int i = 0; i < streamedQueryResultArr.length; i += EVENT_LOOP_POOL_SIZE) {
            streamedQueryResultArr[i] = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY).get();
        }
        int length = streamedQueryResultArr.length;
        for (int i2 = 0; i2 < length; i2 += EVENT_LOOP_POOL_SIZE) {
            StreamedQueryResult streamedQueryResult = streamedQueryResultArr[i2];
            MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
            MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
            MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        }
        int length2 = streamedQueryResultArr.length;
        for (int i3 = 0; i3 < length2; i3 += EVENT_LOOP_POOL_SIZE) {
            shouldReceiveStreamRows(streamedQueryResultArr[i3], false);
        }
        int length3 = streamedQueryResultArr.length;
        for (int i4 = 0; i4 < length3; i4 += EVENT_LOOP_POOL_SIZE) {
            MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResultArr[i4].isComplete()), Matchers.is(false));
        }
    }

    @Test
    public void shouldOnlyWarnForDuplicateIfNotExists() throws Exception {
        this.client.executeStatement("CREATE STREAM FOO (id INT KEY, bar VARCHAR) WITH(value_format='json', kafka_topic='foo', partitions=6);").get();
        this.client.executeStatement("CREATE STREAM IF NOT EXISTS BAR AS SELECT * FROM FOO EMIT CHANGES;").get();
        MatcherAssert.assertThat(((ExecuteStatementResult) this.client.executeStatement("CREATE STREAM IF NOT EXISTS BAR AS SELECT * FROM FOO EMIT CHANGES;").get()).queryId(), Matchers.is(Optional.empty()));
    }

    @Test
    public void shouldStreamPushQueryAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        shouldReceiveStreamRows(streamedQueryResult, false);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldStreamPushQuerySync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        for (int i = 0; i < TEST_NUM_ROWS; i += EVENT_LOOP_POOL_SIZE) {
            verifyStreamRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
    }

    @Test
    public void shouldStreamPullQueryOnStreamAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PULL_QUERY_ON_STREAM).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        ClientTestUtil.shouldReceiveRows(streamedQueryResult, 6, list -> {
            verifyStreamRows(list, 6);
        }, true);
    }

    @Test
    public void shouldStreamPullQueryOnStreamSync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PULL_QUERY_ON_STREAM).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        LinkedList linkedList = new LinkedList();
        while (true) {
            Row poll = streamedQueryResult.poll();
            if (poll == null) {
                verifyStreamRows(linkedList, 6);
                streamedQueryResult.getClass();
                AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
                return;
            }
            linkedList.add(poll);
        }
    }

    @Test
    public void shouldStreamPullQueryOnEmptyStreamSync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery("SELECT * FROM " + EMPTY_TEST_STREAM + ";").get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        LinkedList linkedList = new LinkedList();
        while (true) {
            Row poll = streamedQueryResult.poll();
            if (poll == null) {
                verifyStreamRows(linkedList, 0);
                streamedQueryResult.getClass();
                AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
                return;
            }
            linkedList.add(poll);
        }
    }

    @Test
    public void shouldStreamPullQueryOnTruncatedStreamSync() throws Exception {
        truncateTopic(TRUNCATED_TEST_TOPIC);
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery("SELECT * FROM " + TRUNCATED_TEST_STREAM + ";").get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        LinkedList linkedList = new LinkedList();
        while (true) {
            Row poll = streamedQueryResult.poll();
            if (poll == null) {
                verifyStreamRows(linkedList, 0);
                streamedQueryResult.getClass();
                AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
                return;
            }
            linkedList.add(poll);
        }
    }

    @Test
    public void shouldStreamPullQueryOnTableAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PULL_QUERY_ON_TABLE).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(PULL_QUERY_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(PULL_QUERY_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        ClientTestUtil.shouldReceiveRows(streamedQueryResult, EVENT_LOOP_POOL_SIZE, ClientIntegrationTest::verifyPullQueryRows, true);
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPullQueryOnTableSync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PULL_QUERY_ON_TABLE).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(PULL_QUERY_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(PULL_QUERY_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        verifyPullQueryRow(streamedQueryResult.poll());
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldStreamPushQueryWithLimitAsync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY_WITH_LIMIT).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        shouldReceiveStreamRows(streamedQueryResult, true, PUSH_QUERY_LIMIT_NUM_ROWS);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(true));
    }

    @Test
    public void shouldStreamPushQueryWithLimitSync() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY_WITH_LIMIT).get();
        MatcherAssert.assertThat(streamedQueryResult.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResult.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResult.queryID(), Matchers.is(Matchers.notNullValue()));
        for (int i = 0; i < PUSH_QUERY_LIMIT_NUM_ROWS; i += EVENT_LOOP_POOL_SIZE) {
            verifyStreamRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(true));
    }

    @Test
    public void shouldHandleErrorResponseFromStreamQuery() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("NONEXISTENT does not exist"));
    }

    @Test
    public void shouldDeliverBufferedRowsViaPollIfComplete() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY_WITH_LIMIT).get();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
        for (int i = 0; i < PUSH_QUERY_LIMIT_NUM_ROWS; i += EVENT_LOOP_POOL_SIZE) {
            verifyStreamRowWithIndex(streamedQueryResult.poll(), i);
        }
        MatcherAssert.assertThat(streamedQueryResult.poll(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldAllowSubscribeStreamedQueryResultIfComplete() throws Exception {
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY_WITH_LIMIT).get();
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
        ClientTestUtil.TestSubscriber subscribeAndWait = ClientTestUtil.subscribeAndWait(streamedQueryResult);
        MatcherAssert.assertThat(subscribeAndWait.getValues(), Matchers.hasSize(0));
        subscribeAndWait.getSub().request(2L);
        subscribeAndWait.getClass();
        AssertEventually.assertThatEventually(subscribeAndWait::getValues, Matchers.hasSize(PUSH_QUERY_LIMIT_NUM_ROWS));
        verifyStreamRows(subscribeAndWait.getValues(), PUSH_QUERY_LIMIT_NUM_ROWS);
        MatcherAssert.assertThat(subscribeAndWait.getError(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldExecutePullQuery() throws Exception {
        BatchedQueryResult executeQuery = this.client.executeQuery(PULL_QUERY_ON_TABLE);
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.notNullValue()));
        verifyPullQueryRows((List) executeQuery.get());
    }

    @Test
    public void shouldExecutePullQueryWithVariables() throws Exception {
        this.client.define(AGG_TABLE, AGG_TABLE);
        this.client.define("value", false);
        BatchedQueryResult executeQuery = this.client.executeQuery("SELECT ${value} from ${AGG_TABLE} WHERE K=STRUCT(F1 := ARRAY['a']);");
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(((Row) ((List) executeQuery.get()).get(0)).getBoolean(EVENT_LOOP_POOL_SIZE), Matchers.is(false));
    }

    @Test
    public void shouldExecutePushWithLimitQuery() throws Exception {
        BatchedQueryResult executeQuery = this.client.executeQuery(PUSH_QUERY_WITH_LIMIT);
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.notNullValue()));
        verifyStreamRows((List) executeQuery.get(), PUSH_QUERY_LIMIT_NUM_ROWS);
    }

    @Test
    public void shouldExecutePushQueryWithVariables() throws Exception {
        this.client.define("TEST_STREAM", TEST_STREAM);
        this.client.define("number", 4567);
        BatchedQueryResult executeQuery = this.client.executeQuery("SELECT ${number} FROM ${TEST_STREAM} EMIT CHANGES LIMIT 2;");
        MatcherAssert.assertThat(executeQuery.queryID().get(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(((Row) ((List) executeQuery.get()).get(0)).getInteger(EVENT_LOOP_POOL_SIZE), Matchers.is(4567));
    }

    @Test
    public void shouldHandleErrorResponseFromExecuteQuery() {
        BatchedQueryResult executeQuery = this.client.executeQuery("SELECT * from A_FAKE_TABLE_NAME;");
        executeQuery.getClass();
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, executeQuery::get);
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("A_FAKE_TABLE_NAME does not exist."));
        Exception exc2 = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc2.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("A_FAKE_TABLE_NAME does not exist."));
    }

    @Test
    public void shouldTerminatePushQueryIssuedViaStreamQuery() throws Exception {
        verifyNumActiveQueries(EVENT_LOOP_POOL_SIZE);
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) this.client.streamQuery(PUSH_QUERY).get();
        String queryID = streamedQueryResult.queryID();
        MatcherAssert.assertThat(queryID, Matchers.is(Matchers.notNullValue()));
        verifyNumActiveQueries(PUSH_QUERY_LIMIT_NUM_ROWS);
        MatcherAssert.assertThat(Boolean.valueOf(streamedQueryResult.isComplete()), Matchers.is(false));
        this.client.terminatePushQuery(queryID).get();
        verifyNumActiveQueries(EVENT_LOOP_POOL_SIZE);
        streamedQueryResult.getClass();
        AssertEventually.assertThatEventually(streamedQueryResult::isComplete, Matchers.is(true));
    }

    @Test
    public void shouldTerminatePushQueryIssuedViaExecuteQuery() throws Exception {
        verifyNumActiveQueries(EVENT_LOOP_POOL_SIZE);
        BatchedQueryResult executeQuery = this.client.executeQuery(PUSH_QUERY);
        String str = (String) executeQuery.queryID().get();
        MatcherAssert.assertThat(str, Matchers.is(Matchers.notNullValue()));
        verifyNumActiveQueries(PUSH_QUERY_LIMIT_NUM_ROWS);
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.isDone()), Matchers.is(false));
        this.client.terminatePushQuery(str).get();
        verifyNumActiveQueries(EVENT_LOOP_POOL_SIZE);
        executeQuery.getClass();
        AssertEventually.assertThatEventually(executeQuery::isDone, Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.isCompletedExceptionally()), Matchers.is(false));
    }

    @Test
    public void shouldHandleErrorResponseFromTerminatePushQuery() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("No query with id NONEXISTENT"));
    }

    @Test
    public void shouldExecutePlainHttpRequests() throws Exception {
        Client.HttpResponse httpResponse = (Client.HttpResponse) this.client.buildRequest("GET", "/info").send().get();
        MatcherAssert.assertThat(Integer.valueOf(httpResponse.status()), Matchers.is(200));
        Map bodyAsMap = httpResponse.bodyAsMap();
        ServerInfo serverInfo = (ServerInfo) this.client.serverInfo().get();
        Map map = (Map) bodyAsMap.get("KsqlServerInfo");
        MatcherAssert.assertThat(map.get("version"), Matchers.is(serverInfo.getServerVersion()));
        MatcherAssert.assertThat(map.get("ksqlServiceId"), Matchers.is(serverInfo.getKsqlServiceId()));
        MatcherAssert.assertThat(map.get("kafkaClusterId"), Matchers.is(serverInfo.getKafkaClusterId()));
        MatcherAssert.assertThat(map.get("serverStatus"), Matchers.is("RUNNING"));
    }

    @Test
    public void shouldHandleInvalidSqlInExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Syntax Error"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 40001"));
    }

    @Test
    public void shouldHandleErrorResponseFromExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Stream NONEXISTENT does not exist"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 40001"));
    }

    @Test
    public void shouldHandleWarningResponseFromExecuteStatement() throws ExecutionException, InterruptedException {
        this.client.executeStatement("create table if not exists tasks (taskId varchar primary key) with (kafka_topic='tasks', value_format='json', partitions=1);").get();
        Assert.assertFalse(((ExecuteStatementResult) this.client.executeStatement("create table if not exists tasks (taskId varchar primary key) with (kafka_topic='tasks', value_format='json', partitions=1);").get()).queryId().isPresent());
    }

    @Test
    public void shouldRejectWarningsFromConnectorRequestsInExecuteStatement() throws Exception {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("The ksqlDB server accepted the statement issued via executeStatement(), but the response received is of an unexpected format."));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Use the dropConnector() method instead."));
    }

    @Test
    public void shouldRejectMultipleRequestsFromExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("executeStatement() may only be used to execute one statement at a time"));
    }

    @Test
    public void shouldRejectRequestWithMissingSemicolonFromExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Missing semicolon in SQL for executeStatement() request"));
    }

    @Test
    public void shouldFailOnNoEntitiesFromExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
    }

    @Test
    public void shouldFailToListStreamsViaExecuteStatement() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString(EXECUTE_STATEMENT_USAGE_DOC));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Use the listStreams() method instead"));
    }

    @Test
    public void shouldListStreams() throws Exception {
        List list = (List) this.client.listStreams().get();
        MatcherAssert.assertThat("" + list, list, Matchers.containsInAnyOrder(new Matcher[]{streamForProvider(TEST_DATA_PROVIDER), streamForProvider(EMPTY_TEST_DATA_PROVIDER), streamForProvider(TRUNCATED_TEST_DATA_PROVIDER)}));
    }

    @Test
    public void shouldListTables() throws Exception {
        List list = (List) this.client.listTables().get();
        MatcherAssert.assertThat("" + list, list, Matchers.contains(tableInfo(AGG_TABLE, AGG_TABLE, KEY_FORMAT.name(), VALUE_FORMAT.name(), false)));
    }

    @Test
    public void shouldListTopics() throws Exception {
        AssertEventually.assertThatEventually(() -> {
            try {
                return (List) this.client.listTopics().get();
            } catch (InterruptedException | ExecutionException e) {
                return null;
            }
        }, Matchers.containsInAnyOrder(new Matcher[]{topicInfo(TEST_TOPIC), topicInfo(EMPTY_TEST_TOPIC), topicInfo(TRUNCATED_TEST_TOPIC), topicInfo(AGG_TABLE), topicInfo("connect-config")}));
    }

    @Test
    public void shouldListQueries() throws ExecutionException, InterruptedException {
        MatcherAssert.assertThat((List) this.client.listQueries().get(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("queryType", Matchers.is(QueryInfo.QueryType.PERSISTENT)), Matchers.hasProperty("id", Matchers.startsWith("CTAS_AGG_TABLE")), Matchers.hasProperty("sql", Matchers.is("CREATE TABLE AGG_TABLE WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='AGG_TABLE', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n  " + TEST_STREAM + ".K K,\n  LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\nFROM " + TEST_STREAM + " " + TEST_STREAM + "\nGROUP BY " + TEST_STREAM + ".K\nEMIT CHANGES;")), Matchers.hasProperty("sink", Matchers.is(Optional.of(AGG_TABLE))), Matchers.hasProperty("sinkTopic", Matchers.is(Optional.of(AGG_TABLE))))));
    }

    @Test
    public void shouldDescribeSource() throws Exception {
        SourceDescription sourceDescription = (SourceDescription) this.client.describeSource(TEST_STREAM).get();
        MatcherAssert.assertThat(sourceDescription.name(), Matchers.is(TEST_STREAM));
        MatcherAssert.assertThat(sourceDescription.type(), Matchers.is("STREAM"));
        MatcherAssert.assertThat(sourceDescription.fields(), Matchers.hasSize(TEST_COLUMN_NAMES.size()));
        for (int i = 0; i < TEST_COLUMN_NAMES.size(); i += EVENT_LOOP_POOL_SIZE) {
            MatcherAssert.assertThat(((FieldInfo) sourceDescription.fields().get(i)).name(), Matchers.is(TEST_COLUMN_NAMES.get(i)));
            MatcherAssert.assertThat(((FieldInfo) sourceDescription.fields().get(i)).type().getType(), Matchers.is(TEST_COLUMN_TYPES.get(i).getType()));
            MatcherAssert.assertThat(Boolean.valueOf(((FieldInfo) sourceDescription.fields().get(i)).isKey()), Matchers.is(Boolean.valueOf(TEST_COLUMN_NAMES.get(i).equals(TEST_DATA_PROVIDER.key()))));
        }
        MatcherAssert.assertThat(sourceDescription.topic(), Matchers.is(TEST_TOPIC));
        MatcherAssert.assertThat(sourceDescription.keyFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(sourceDescription.valueFormat(), Matchers.is("JSON"));
        MatcherAssert.assertThat(sourceDescription.readQueries(), Matchers.hasSize(EVENT_LOOP_POOL_SIZE));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getQueryType(), Matchers.is(QueryInfo.QueryType.PERSISTENT));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getId(), Matchers.startsWith("CTAS_AGG_TABLE"));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getSql(), Matchers.is("CREATE TABLE AGG_TABLE WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='AGG_TABLE', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n  " + TEST_STREAM + ".K K,\n  LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\nFROM " + TEST_STREAM + " " + TEST_STREAM + "\nGROUP BY " + TEST_STREAM + ".K\nEMIT CHANGES;"));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getSink(), Matchers.is(Optional.of(AGG_TABLE)));
        MatcherAssert.assertThat(((QueryInfo) sourceDescription.readQueries().get(0)).getSinkTopic(), Matchers.is(Optional.of(AGG_TABLE)));
        MatcherAssert.assertThat(sourceDescription.writeQueries(), Matchers.hasSize(0));
        MatcherAssert.assertThat(sourceDescription.timestampColumn(), Matchers.is(Optional.empty()));
        MatcherAssert.assertThat(sourceDescription.windowType(), Matchers.is(Optional.empty()));
        MatcherAssert.assertThat(sourceDescription.sqlStatement(), Matchers.is("CREATE STREAM " + TEST_STREAM + " (K STRUCT<F1 ARRAY<STRING>> KEY, STR STRING, LONG BIGINT, DEC DECIMAL(4, 2), BYTES_ BYTES, ARRAY ARRAY<STRING>, MAP MAP<STRING, STRING>, STRUCT STRUCT<F1 INTEGER>, COMPLEX STRUCT<`DECIMAL` DECIMAL(2, 1), STRUCT STRUCT<F1 STRING, F2 INTEGER>, ARRAY_ARRAY ARRAY<ARRAY<STRING>>, ARRAY_STRUCT ARRAY<STRUCT<F1 STRING>>, ARRAY_MAP ARRAY<MAP<STRING, INTEGER>>, MAP_ARRAY MAP<STRING, ARRAY<STRING>>, MAP_MAP MAP<STRING, MAP<STRING, INTEGER>>, MAP_STRUCT MAP<STRING, STRUCT<F1 STRING>>>, TIMESTAMP TIMESTAMP, DATE DATE, TIME TIME, HEAD BYTES HEADER('h0')) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='STRUCTURED_TYPES_TOPIC', KEY_FORMAT='JSON', VALUE_FORMAT='JSON');"));
    }

    @Test
    public void shouldHandleErrorResponseFromDescribeSource() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 400 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Could not find STREAM/TABLE 'NONEXISTENT' in the Metastore"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 40001"));
    }

    @Test
    public void shouldGetServerInfo() throws Exception {
        String str = (String) REST_APP.getServiceContext().getAdminClient().describeCluster().clusterId().get();
        ServerInfo serverInfo = (ServerInfo) this.client.serverInfo().get();
        MatcherAssert.assertThat(serverInfo.getServerVersion(), Matchers.is(AppInfo.getVersion()));
        MatcherAssert.assertThat(serverInfo.getKsqlServiceId(), Matchers.is("default_"));
        MatcherAssert.assertThat(serverInfo.getKafkaClusterId(), Matchers.is(str));
    }

    @Test
    public void shouldListConnectors() throws Exception {
        givenConnectorExists();
        List list = (List) this.client.listConnectors().get();
        MatcherAssert.assertThat(Integer.valueOf(list.size()), Matchers.is(Integer.valueOf(EVENT_LOOP_POOL_SIZE)));
        MatcherAssert.assertThat(((ConnectorInfo) list.get(0)).name(), Matchers.is(TEST_CONNECTOR));
        MatcherAssert.assertThat(((ConnectorInfo) list.get(0)).className(), Matchers.is(MOCK_SOURCE_CLASS));
        MatcherAssert.assertThat(((ConnectorInfo) list.get(0)).state(), Matchers.is("RUNNING (1/1 tasks RUNNING)"));
        MatcherAssert.assertThat(((ConnectorInfo) list.get(0)).type(), Matchers.is(SOURCE_TYPE));
    }

    @Test
    public void shouldDescribeConnector() throws Exception {
        givenConnectorExists();
        ConnectorDescription connectorDescription = (ConnectorDescription) this.client.describeConnector(TEST_CONNECTOR).get();
        MatcherAssert.assertThat(connectorDescription.type(), Matchers.is(SOURCE_TYPE));
        MatcherAssert.assertThat(connectorDescription.state(), Matchers.is("RUNNING"));
        MatcherAssert.assertThat(Integer.valueOf(connectorDescription.topics().size()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(connectorDescription.sources().size()), Matchers.is(0));
        MatcherAssert.assertThat(connectorDescription.className(), Matchers.is(MOCK_SOURCE_CLASS));
    }

    @Test
    public void shouldDropConnector() throws Exception {
        givenConnectorExists();
        this.client.dropConnector(TEST_CONNECTOR).get();
        AssertEventually.assertThatEventually(() -> {
            try {
                return Integer.valueOf(((List) this.client.listConnectors().get()).size());
            } catch (InterruptedException | ExecutionException e) {
                return null;
            }
        }, Matchers.is(0));
    }

    @Test
    public void shouldNotFailToDropNonExistantConnector() throws Exception {
        this.client.dropConnector("nonExistentConnector", true).get();
    }

    @Test
    public void shouldCreateConnector() throws Exception {
        this.client.createConnector("FOO", true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS)).get();
        AssertEventually.assertThatEventually(() -> {
            try {
                return ((ConnectorDescription) this.client.describeConnector("FOO").get()).state();
            } catch (InterruptedException | ExecutionException e) {
                return null;
            }
        }, Matchers.is("RUNNING"));
    }

    @Test
    public void shouldNotFailToCreateConnectorThatExists() throws Exception {
        givenConnectorExists();
        this.client.createConnector(TEST_CONNECTOR, true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS), true).get();
    }

    @Test
    public void shouldCreateConnectorWithVariables() throws Exception {
        this.client.define("class", MOCK_SOURCE_CLASS);
        this.client.createConnector("FOO", true, ImmutableMap.of("connector.class", "${class}")).get();
        AssertEventually.assertThatEventually(() -> {
            try {
                return ((ConnectorDescription) this.client.describeConnector("FOO").get()).state();
            } catch (InterruptedException | ExecutionException e) {
                return null;
            }
        }, Matchers.is("RUNNING"));
    }

    private Client createClient() {
        return Client.create(ClientOptions.create().setHost("localhost").setPort(((URL) REST_APP.getListeners().get(0)).getPort()), this.vertx);
    }

    private void givenConnectorExists() {
        cleanupConnectors();
        makeKsqlRequest("CREATE SOURCE CONNECTOR TEST_CONNECTOR WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');");
        AssertEventually.assertThatEventually(() -> {
            try {
                return Integer.valueOf(makeKsqlRequest("SHOW CONNECTORS;").get(0).getConnectors().size());
            } catch (AssertionError e) {
                return 0;
            }
        }, Matchers.is(Integer.valueOf(EVENT_LOOP_POOL_SIZE)));
    }

    private static void cleanupConnectors() {
        makeKsqlRequest("SHOW CONNECTORS;").get(0).getConnectors().forEach(simpleConnectorInfo -> {
            makeKsqlRequest("DROP CONNECTOR " + simpleConnectorInfo.getName() + ";");
        });
        AssertEventually.assertThatEventually(() -> {
            return Integer.valueOf(makeKsqlRequest("SHOW CONNECTORS;").get(0).getConnectors().size());
        }, Matchers.is(0));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<KsqlEntity> makeKsqlRequest(String str) {
        return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, str);
    }

    private static void verifyNumActiveQueries(int i) {
        KsqlEngine engine = REST_APP.getEngine();
        engine.getClass();
        AssertEventually.assertThatEventually(engine::numberOfLiveQueries, Matchers.is(Integer.valueOf(i)));
    }

    private static void shouldReceiveStreamRows(Publisher<Row> publisher, boolean z) {
        shouldReceiveStreamRows(publisher, z, TEST_NUM_ROWS);
    }

    private static void shouldReceiveStreamRows(Publisher<Row> publisher, boolean z, int i) {
        ClientTestUtil.shouldReceiveRows(publisher, i, list -> {
            verifyStreamRows(list, i);
        }, z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void verifyStreamRows(List<Row> list, int i) {
        for (int i2 = 0; i2 < Math.min(i, list.size()); i2 += EVENT_LOOP_POOL_SIZE) {
            verifyStreamRowWithIndex(list.get(i2), i2);
        }
        if (list.size() < i) {
            Assert.fail("Expected " + i + " but only got " + list.size());
        } else if (list.size() > i) {
            Assert.fail("Expected " + i + " but got " + list.size() + ". The extra rows were: " + list.subList(i, list.size()));
        }
        MatcherAssert.assertThat(list, Matchers.hasSize(i));
    }

    private static void verifyStreamRowWithIndex(Row row, int i) {
        KsqlArray ksqlArray = TEST_EXPECTED_ROWS.get(i);
        MatcherAssert.assertThat(row.values(), Matchers.equalTo(ksqlArray));
        MatcherAssert.assertThat(row.columnNames(), Matchers.equalTo(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(row.columnTypes(), Matchers.equalTo(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(row.getKsqlObject("K"), Matchers.is(ksqlArray.getKsqlObject(0)));
        MatcherAssert.assertThat(row.getString("STR"), Matchers.is(ksqlArray.getString(EVENT_LOOP_POOL_SIZE)));
        MatcherAssert.assertThat(row.getLong("LONG"), Matchers.is(ksqlArray.getLong(PUSH_QUERY_LIMIT_NUM_ROWS)));
        MatcherAssert.assertThat(row.getDecimal("DEC"), Matchers.is(ksqlArray.getDecimal(3)));
        MatcherAssert.assertThat(row.getBytes("BYTES_"), Matchers.is(ksqlArray.getBytes(4)));
        MatcherAssert.assertThat(row.getKsqlArray("ARRAY"), Matchers.is(ksqlArray.getKsqlArray(NUM_CONCURRENT_REQUESTS_TO_TEST)));
        MatcherAssert.assertThat(row.getKsqlObject("MAP"), Matchers.is(ksqlArray.getKsqlObject(6)));
        MatcherAssert.assertThat(row.getKsqlObject("STRUCT"), Matchers.is(ksqlArray.getKsqlObject(7)));
        MatcherAssert.assertThat(row.getKsqlObject("COMPLEX"), Matchers.is(ksqlArray.getKsqlObject(8)));
        MatcherAssert.assertThat(row.getKsqlObject(EVENT_LOOP_POOL_SIZE), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(row.getString(PUSH_QUERY_LIMIT_NUM_ROWS), Matchers.is(row.getString("STR")));
        MatcherAssert.assertThat(row.getLong(3), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(row.getDecimal(4), Matchers.is(row.getDecimal("DEC")));
        MatcherAssert.assertThat(row.getBytes(NUM_CONCURRENT_REQUESTS_TO_TEST), Matchers.is(row.getBytes("BYTES_")));
        MatcherAssert.assertThat(row.getKsqlArray(6), Matchers.is(row.getKsqlArray("ARRAY")));
        MatcherAssert.assertThat(row.getKsqlObject(7), Matchers.is(row.getKsqlObject("MAP")));
        MatcherAssert.assertThat(row.getKsqlObject(8), Matchers.is(row.getKsqlObject("STRUCT")));
        MatcherAssert.assertThat(row.getKsqlObject(9), Matchers.is(row.getKsqlObject("COMPLEX")));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("STR")), Matchers.is(false));
        Assert.assertThrows(ClassCastException.class, () -> {
            row.getInteger("STR");
        });
        KsqlArray values = row.values();
        MatcherAssert.assertThat(Integer.valueOf(values.size()), Matchers.is(Integer.valueOf(TEST_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(values.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(values.getKsqlObject(0), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(values.getString(EVENT_LOOP_POOL_SIZE), Matchers.is(row.getString("STR")));
        MatcherAssert.assertThat(values.getLong(PUSH_QUERY_LIMIT_NUM_ROWS), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(values.getDecimal(3), Matchers.is(row.getDecimal("DEC")));
        MatcherAssert.assertThat(values.getBytes(4), Matchers.is(row.getBytes("BYTES_")));
        MatcherAssert.assertThat(values.getKsqlArray(NUM_CONCURRENT_REQUESTS_TO_TEST), Matchers.is(row.getKsqlArray("ARRAY")));
        MatcherAssert.assertThat(values.getKsqlObject(6), Matchers.is(row.getKsqlObject("MAP")));
        MatcherAssert.assertThat(values.getKsqlObject(7), Matchers.is(row.getKsqlObject("STRUCT")));
        MatcherAssert.assertThat(values.getKsqlObject(8), Matchers.is(row.getKsqlObject("COMPLEX")));
        MatcherAssert.assertThat(values.toJsonString(), Matchers.is(new JsonArray(values.getList()).toString()));
        MatcherAssert.assertThat(values.toString(), Matchers.is(values.toJsonString()));
        KsqlObject asObject = row.asObject();
        MatcherAssert.assertThat(Integer.valueOf(asObject.size()), Matchers.is(Integer.valueOf(TEST_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(asObject.fieldNames(), Matchers.contains(TEST_COLUMN_NAMES.toArray()));
        MatcherAssert.assertThat(asObject.getKsqlObject("K"), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(asObject.getString("STR"), Matchers.is(row.getString("STR")));
        MatcherAssert.assertThat(asObject.getLong("LONG"), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(asObject.getDecimal("DEC"), Matchers.is(row.getDecimal("DEC")));
        MatcherAssert.assertThat(asObject.getBytes("BYTES_"), Matchers.is(row.getBytes("BYTES_")));
        MatcherAssert.assertThat(asObject.getKsqlArray("ARRAY"), Matchers.is(row.getKsqlArray("ARRAY")));
        MatcherAssert.assertThat(asObject.getKsqlObject("MAP"), Matchers.is(row.getKsqlObject("MAP")));
        MatcherAssert.assertThat(asObject.getKsqlObject("STRUCT"), Matchers.is(row.getKsqlObject("STRUCT")));
        MatcherAssert.assertThat(asObject.getKsqlObject("COMPLEX"), Matchers.is(row.getKsqlObject("COMPLEX")));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("DEC")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("notafield")), Matchers.is(false));
        MatcherAssert.assertThat(asObject.toJsonString(), Matchers.is(new JsonObject(asObject.getMap()).toString()));
        MatcherAssert.assertThat(asObject.toString(), Matchers.is(asObject.toJsonString()));
    }

    private static void verifyPullQueryRows(List<Row> list) {
        MatcherAssert.assertThat(list, Matchers.hasSize(EVENT_LOOP_POOL_SIZE));
        verifyPullQueryRow(list.get(0));
    }

    private static void verifyPullQueryRow(Row row) {
        MatcherAssert.assertThat(row.values(), Matchers.equalTo(PULL_QUERY_EXPECTED_ROW));
        MatcherAssert.assertThat(row.columnNames(), Matchers.equalTo(PULL_QUERY_COLUMN_NAMES));
        MatcherAssert.assertThat(row.columnTypes(), Matchers.equalTo(PULL_QUERY_COLUMN_TYPES));
        MatcherAssert.assertThat(row.getKsqlObject("K"), Matchers.is(PULL_QUERY_EXPECTED_ROW.getKsqlObject(0)));
        MatcherAssert.assertThat(row.getLong("LONG"), Matchers.is(PULL_QUERY_EXPECTED_ROW.getLong(EVENT_LOOP_POOL_SIZE)));
        MatcherAssert.assertThat(row.getKsqlObject(EVENT_LOOP_POOL_SIZE), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(row.getLong(PUSH_QUERY_LIMIT_NUM_ROWS), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("K")), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("LONG")), Matchers.is(false));
        Assert.assertThrows(ClassCastException.class, () -> {
            row.getInteger("K");
        });
        KsqlArray values = row.values();
        MatcherAssert.assertThat(Integer.valueOf(values.size()), Matchers.is(Integer.valueOf(PULL_QUERY_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(values.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(values.getKsqlObject(0), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(values.getLong(EVENT_LOOP_POOL_SIZE), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(values.toJsonString(), Matchers.is(new JsonArray(values.getList()).toString()));
        MatcherAssert.assertThat(values.toString(), Matchers.is(values.toJsonString()));
        KsqlObject asObject = row.asObject();
        MatcherAssert.assertThat(Integer.valueOf(asObject.size()), Matchers.is(Integer.valueOf(PULL_QUERY_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(asObject.fieldNames(), Matchers.contains(PULL_QUERY_COLUMN_NAMES.toArray()));
        MatcherAssert.assertThat(asObject.getKsqlObject("K"), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(asObject.getLong("LONG"), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("LONG")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("notafield")), Matchers.is(false));
        MatcherAssert.assertThat(asObject.toJsonString(), Matchers.is(new JsonObject(asObject.getMap()).toString()));
        MatcherAssert.assertThat(asObject.toString(), Matchers.is(asObject.toJsonString()));
    }

    private static List<KsqlArray> convertToClientRows(Multimap<GenericKey, GenericRow> multimap) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : multimap.entries()) {
            KsqlArray ksqlArray = new KsqlArray();
            addObjectToKsqlArray(ksqlArray, ((GenericKey) entry.getKey()).get(0));
            Iterator it = ((GenericRow) entry.getValue()).values().iterator();
            while (it.hasNext()) {
                addObjectToKsqlArray(ksqlArray, it.next());
            }
            addObjectToKsqlArray(ksqlArray, new byte[]{23});
            arrayList.add(ksqlArray);
        }
        return arrayList;
    }

    private static void addObjectToKsqlArray(KsqlArray ksqlArray, Object obj) {
        if (obj instanceof Struct) {
            ksqlArray.add(StructuredTypesDataProvider.structToMap((Struct) obj));
            return;
        }
        if (obj instanceof BigDecimal) {
            ksqlArray.addAll(new KsqlArray(Collections.singletonList(obj)));
            return;
        }
        if (obj instanceof Timestamp) {
            ksqlArray.add(SqlTimeTypes.formatTimestamp((Timestamp) obj));
            return;
        }
        if (obj instanceof Date) {
            ksqlArray.add(SqlTimeTypes.formatDate((Date) obj));
            return;
        }
        if (obj instanceof Time) {
            ksqlArray.add(SqlTimeTypes.formatTime((Time) obj));
        } else if (obj instanceof byte[]) {
            ksqlArray.add(serializeVertX3CompatibleByte((byte[]) obj));
        } else {
            ksqlArray.add(obj);
        }
    }

    private static String serializeVertX3CompatibleByte(byte[] bArr) {
        try {
            String writeValueAsString = ApiJsonMapper.INSTANCE.get().writeValueAsString(bArr);
            return writeValueAsString.substring(EVENT_LOOP_POOL_SIZE, writeValueAsString.length() - EVENT_LOOP_POOL_SIZE);
        } catch (JsonProcessingException e) {
            throw new KsqlException(e);
        }
    }

    private static Matcher<? super StreamInfo> streamForProvider(TestDataProvider testDataProvider) {
        return streamInfo(testDataProvider.sourceName(), testDataProvider.topicName(), KEY_FORMAT.name(), VALUE_FORMAT.name(), false);
    }

    private static Matcher<? super StreamInfo> streamInfo(final String str, final String str2, final String str3, final String str4, final boolean z) {
        return new TypeSafeDiagnosingMatcher<StreamInfo>() { // from class: io.confluent.ksql.api.client.integration.ClientIntegrationTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(StreamInfo streamInfo, Description description) {
                return str.equals(streamInfo.getName()) && str2.equals(streamInfo.getTopic()) && str3.equals(streamInfo.getValueFormat()) && str4.equals(streamInfo.getValueFormat()) && z == streamInfo.isWindowed();
            }

            public void describeTo(Description description) {
                description.appendText(String.format("streamName: %s. topicName: %s. keyFormat: %s. valueFormat: %s. isWindowed: %s", str, str2, str3, str4, Boolean.valueOf(z)));
            }
        };
    }

    private static Matcher<? super TableInfo> tableInfo(final String str, final String str2, final String str3, final String str4, final boolean z) {
        return new TypeSafeDiagnosingMatcher<TableInfo>() { // from class: io.confluent.ksql.api.client.integration.ClientIntegrationTest.2
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(TableInfo tableInfo, Description description) {
                return str.equals(tableInfo.getName()) && str2.equals(tableInfo.getTopic()) && str3.equals(tableInfo.getKeyFormat()) && str4.equals(tableInfo.getValueFormat()) && z == tableInfo.isWindowed();
            }

            public void describeTo(Description description) {
                description.appendText(String.format("tableName: %s. topicName: %s. keyFormat: %s. valueFormat: %s. isWindowed: %s", str, str2, str3, str4, Boolean.valueOf(z)));
            }
        };
    }

    private static Matcher<? super TopicInfo> topicInfo(final String str) {
        return new TypeSafeDiagnosingMatcher<TopicInfo>() { // from class: io.confluent.ksql.api.client.integration.ClientIntegrationTest.3
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(TopicInfo topicInfo, Description description) {
                if (!str.equals(topicInfo.getName()) || topicInfo.getPartitions() != ClientIntegrationTest.EVENT_LOOP_POOL_SIZE) {
                    return false;
                }
                List replicasPerPartition = topicInfo.getReplicasPerPartition();
                return replicasPerPartition.size() == ClientIntegrationTest.EVENT_LOOP_POOL_SIZE && ((Integer) replicasPerPartition.get(0)).intValue() == ClientIntegrationTest.EVENT_LOOP_POOL_SIZE;
            }

            public void describeTo(Description description) {
                description.appendText("name: " + str);
            }
        };
    }

    static {
        IntegrationTestHarness integrationTestHarness = TEST_HARNESS;
        integrationTestHarness.getClass();
        REST_APP = TestKsqlRestApp.builder(integrationTestHarness::kafkaBootstrapServers).withProperty("ksql.streams.num.stream.threads", Integer.valueOf(EVENT_LOOP_POOL_SIZE)).withProperty("ksql.persistence.default.format.key", "JSON").withProperty("ksql.verticle.instances", Integer.valueOf(EVENT_LOOP_POOL_SIZE)).withProperty("ksql.worker.pool.size", Integer.valueOf(WORKER_POOL_SIZE)).withProperty("ksql.headers.columns.enabled", true).build();
        CHAIN = RuleChain.outerRule(Retry.of(3, ZooKeeperClientException.class, 3L, TimeUnit.SECONDS)).around(TEST_HARNESS).around(REST_APP);
    }
}
