package io.confluent.ksql.api.client.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.execution.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.SqlTimeTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.test.util.AssertEventually;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KafkaStreams;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.reactivestreams.Publisher;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/ksql/api/client/integration/PushV2ClientContinueIntegrationTest.class */
public class PushV2ClientContinueIntegrationTest {
    private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = new StructuredTypesDataProvider();
    private static final StructuredTypesDataProvider TEST_MORE_DATA_PROVIDER = new StructuredTypesDataProvider(StructuredTypesDataProvider.Batch.BATCH2);
    private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName();
    private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName();
    private static final int TEST_NUM_ROWS = TEST_DATA_PROVIDER.data().size();
    private static final int TEST_MORE_NUM_ROWS = TEST_MORE_DATA_PROVIDER.data().size();
    private static final List<String> TEST_COLUMN_NAMES = ImmutableList.of("K", "STR", "LONG", "DEC", "BYTES_", "ARRAY", "MAP", "STRUCT", "COMPLEX", "TIMESTAMP", "DATE", "TIME", new String[]{"HEAD"});
    private static final List<ColumnType> TEST_COLUMN_TYPES = RowUtil.columnTypesFromStrings(ImmutableList.of("STRUCT", "STRING", "BIGINT", "DECIMAL", "BYTES", "ARRAY", "MAP", "STRUCT", "STRUCT", "TIMESTAMP", "DATE", "TIME", new String[]{"BYTES"}));
    private static final List<KsqlArray> TEST_EXPECTED_ROWS = convertToClientRows(TEST_DATA_PROVIDER.data());
    private static final List<KsqlArray> TEST_MORE_EXPECTED_ROWS = convertToClientRows(TEST_MORE_DATA_PROVIDER.data());
    private static final Format KEY_FORMAT = FormatFactory.JSON;
    private static final Format VALUE_FORMAT = FormatFactory.JSON;
    private static final Supplier<Long> TS_SUPPLIER = () -> {
        return 0L;
    };
    private static final Supplier<List<Header>> HEADERS_SUPPLIER = () -> {
        return ImmutableList.of(new RecordHeader("h0", new byte[]{23}));
    };
    private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
    private static final int EVENT_LOOP_POOL_SIZE = 1;
    private static final int WORKER_POOL_SIZE = 10;
    private static final TestKsqlRestApp REST_APP;

    @ClassRule
    public static final RuleChain CHAIN;
    private Vertx vertx;
    private Client client;

    @BeforeClass
    public static void setUpClass() throws Exception {
        TEST_HARNESS.ensureTopics(new String[]{TEST_TOPIC});
        RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER);
    }

    @AfterClass
    public static void classTearDown() {
        REST_APP.getPersistentQueries().forEach(str -> {
            makeKsqlRequest("TERMINATE " + str + ";");
        });
    }

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.client = createClient();
    }

    @After
    public void tearDown() {
        if (this.client != null) {
            this.client.close();
        }
        if (this.vertx != null) {
            this.vertx.close();
        }
        REST_APP.getServiceContext().close();
    }

    @Test
    public void shouldContinueStreamPushQueryV2AfterNetworkFault() throws Exception {
        makeKsqlRequest("CREATE STREAM SOURCE_SPQV2 AS SELECT * FROM " + TEST_STREAM + ";");
        assertAllPersistentQueriesRunning();
        StreamedQueryResultImpl streamedQueryResultImpl = (StreamedQueryResult) this.client.streamQuery("SELECT * FROM SOURCE_SPQV2 EMIT CHANGES;").get();
        MatcherAssert.assertThat(streamedQueryResultImpl.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResultImpl.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResultImpl.queryID(), Matchers.is(Matchers.notNullValue()));
        assertExpectedScalablePushQueries(EVENT_LOOP_POOL_SIZE);
        TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT, TS_SUPPLIER, HEADERS_SUPPLIER);
        StreamedQueryResultImpl streamedQueryResultImpl2 = streamedQueryResultImpl;
        streamedQueryResultImpl2.getClass();
        AssertEventually.assertThatEventually(streamedQueryResultImpl2::hasContinuationToken, Matchers.is(true));
        String str = (String) streamedQueryResultImpl.getContinuationToken().get();
        shouldReceiveStreamRows(streamedQueryResultImpl, false, TEST_NUM_ROWS, TEST_EXPECTED_ROWS);
        AssertEventually.assertThatEventually(() -> {
            return Boolean.valueOf(str.equals(((StreamedQueryResultImpl) streamedQueryResultImpl).getContinuationToken().get()));
        }, Matchers.is(false));
        try {
            REST_APP.stop();
        } catch (Exception e) {
        }
        REST_APP.start();
        assertAllPersistentQueriesRunning();
        StreamedQueryResult streamedQueryResult = (StreamedQueryResult) streamedQueryResultImpl.continueFromLastContinuationToken().get();
        MatcherAssert.assertThat(streamedQueryResultImpl.columnNames(), Matchers.is(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(streamedQueryResultImpl.columnTypes(), Matchers.is(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(streamedQueryResultImpl.queryID(), Matchers.is(Matchers.notNullValue()));
        assertExpectedScalablePushQueries(EVENT_LOOP_POOL_SIZE);
        TEST_HARNESS.produceRows(TEST_TOPIC, TEST_MORE_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT, TS_SUPPLIER, HEADERS_SUPPLIER);
        shouldReceiveStreamRows(streamedQueryResult, false, TEST_MORE_NUM_ROWS, TEST_MORE_EXPECTED_ROWS);
    }

    private Client createClient() {
        return Client.create(ClientOptions.create().setHost("localhost").setPort(((URL) REST_APP.getListeners().get(0)).getPort()), this.vertx);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<KsqlEntity> makeKsqlRequest(String str) {
        return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, str);
    }

    private static void shouldReceiveStreamRows(Publisher<Row> publisher, boolean z, int i, List<KsqlArray> list) {
        ClientTestUtil.shouldReceiveRows(publisher, i, list2 -> {
            verifyStreamRows(list2, i, list);
        }, z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void verifyStreamRows(List<Row> list, int i, List<KsqlArray> list2) {
        List list3 = (List) list.stream().sorted(ClientTestUtil::compareRowByOrderedLong).collect(Collectors.toList());
        for (int i2 = 0; i2 < Math.min(i, list.size()); i2 += EVENT_LOOP_POOL_SIZE) {
            verifyStreamRowWithIndex((Row) list3.get(i2), i2, list2);
        }
        if (list.size() < i) {
            Assert.fail("Expected " + i + " but only got " + list.size());
        } else if (list.size() > i) {
            Assert.fail("Expected " + i + " but got " + list.size() + ". The extra rows were: " + list.subList(i, list.size()));
        }
        MatcherAssert.assertThat(list, Matchers.hasSize(i));
    }

    private static void verifyStreamRowWithIndex(Row row, int i, List<KsqlArray> list) {
        KsqlArray ksqlArray = (KsqlArray) ((List) list.stream().sorted(ClientTestUtil::compareKsqlArrayByOrderedLong).collect(Collectors.toList())).get(i);
        MatcherAssert.assertThat(row.values(), Matchers.equalTo(ksqlArray));
        MatcherAssert.assertThat(row.columnNames(), Matchers.equalTo(TEST_COLUMN_NAMES));
        MatcherAssert.assertThat(row.columnTypes(), Matchers.equalTo(TEST_COLUMN_TYPES));
        MatcherAssert.assertThat(row.getKsqlObject("K"), Matchers.is(ksqlArray.getKsqlObject(0)));
        MatcherAssert.assertThat(row.getString("STR"), Matchers.is(ksqlArray.getString(EVENT_LOOP_POOL_SIZE)));
        MatcherAssert.assertThat(row.getLong("LONG"), Matchers.is(ksqlArray.getLong(2)));
        MatcherAssert.assertThat(row.getDecimal("DEC"), Matchers.is(ksqlArray.getDecimal(3)));
        MatcherAssert.assertThat(row.getBytes("BYTES_"), Matchers.is(ksqlArray.getBytes(4)));
        MatcherAssert.assertThat(row.getKsqlArray("ARRAY"), Matchers.is(ksqlArray.getKsqlArray(5)));
        MatcherAssert.assertThat(row.getKsqlObject("MAP"), Matchers.is(ksqlArray.getKsqlObject(6)));
        MatcherAssert.assertThat(row.getKsqlObject("STRUCT"), Matchers.is(ksqlArray.getKsqlObject(7)));
        MatcherAssert.assertThat(row.getKsqlObject("COMPLEX"), Matchers.is(ksqlArray.getKsqlObject(8)));
        MatcherAssert.assertThat(row.getKsqlObject(EVENT_LOOP_POOL_SIZE), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(row.getString(2), Matchers.is(row.getString("STR")));
        MatcherAssert.assertThat(row.getLong(3), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(row.getDecimal(4), Matchers.is(row.getDecimal("DEC")));
        MatcherAssert.assertThat(row.getBytes(5), Matchers.is(row.getBytes("BYTES_")));
        MatcherAssert.assertThat(row.getKsqlArray(6), Matchers.is(row.getKsqlArray("ARRAY")));
        MatcherAssert.assertThat(row.getKsqlObject(7), Matchers.is(row.getKsqlObject("MAP")));
        MatcherAssert.assertThat(row.getKsqlObject(8), Matchers.is(row.getKsqlObject("STRUCT")));
        MatcherAssert.assertThat(row.getKsqlObject(9), Matchers.is(row.getKsqlObject("COMPLEX")));
        MatcherAssert.assertThat(Boolean.valueOf(row.isNull("STR")), Matchers.is(false));
        Assert.assertThrows(ClassCastException.class, () -> {
            row.getInteger("STR");
        });
        KsqlArray values = row.values();
        MatcherAssert.assertThat(Integer.valueOf(values.size()), Matchers.is(Integer.valueOf(TEST_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(values.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(values.getKsqlObject(0), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(values.getString(EVENT_LOOP_POOL_SIZE), Matchers.is(row.getString("STR")));
        MatcherAssert.assertThat(values.getLong(2), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(values.getDecimal(3), Matchers.is(row.getDecimal("DEC")));
        MatcherAssert.assertThat(values.getBytes(4), Matchers.is(row.getBytes("BYTES_")));
        MatcherAssert.assertThat(values.getKsqlArray(5), Matchers.is(row.getKsqlArray("ARRAY")));
        MatcherAssert.assertThat(values.getKsqlObject(6), Matchers.is(row.getKsqlObject("MAP")));
        MatcherAssert.assertThat(values.getKsqlObject(7), Matchers.is(row.getKsqlObject("STRUCT")));
        MatcherAssert.assertThat(values.getKsqlObject(8), Matchers.is(row.getKsqlObject("COMPLEX")));
        MatcherAssert.assertThat(values.toJsonString(), Matchers.is(new JsonArray(values.getList()).toString()));
        MatcherAssert.assertThat(values.toString(), Matchers.is(values.toJsonString()));
        KsqlObject asObject = row.asObject();
        MatcherAssert.assertThat(Integer.valueOf(asObject.size()), Matchers.is(Integer.valueOf(TEST_COLUMN_NAMES.size())));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.isEmpty()), Matchers.is(false));
        MatcherAssert.assertThat(asObject.fieldNames(), Matchers.contains(TEST_COLUMN_NAMES.toArray()));
        MatcherAssert.assertThat(asObject.getKsqlObject("K"), Matchers.is(row.getKsqlObject("K")));
        MatcherAssert.assertThat(asObject.getString("STR"), Matchers.is(row.getString("STR")));
        MatcherAssert.assertThat(asObject.getLong("LONG"), Matchers.is(row.getLong("LONG")));
        MatcherAssert.assertThat(asObject.getDecimal("DEC"), Matchers.is(row.getDecimal("DEC")));
        MatcherAssert.assertThat(asObject.getBytes("BYTES_"), Matchers.is(row.getBytes("BYTES_")));
        MatcherAssert.assertThat(asObject.getKsqlArray("ARRAY"), Matchers.is(row.getKsqlArray("ARRAY")));
        MatcherAssert.assertThat(asObject.getKsqlObject("MAP"), Matchers.is(row.getKsqlObject("MAP")));
        MatcherAssert.assertThat(asObject.getKsqlObject("STRUCT"), Matchers.is(row.getKsqlObject("STRUCT")));
        MatcherAssert.assertThat(asObject.getKsqlObject("COMPLEX"), Matchers.is(row.getKsqlObject("COMPLEX")));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("DEC")), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(asObject.containsKey("notafield")), Matchers.is(false));
        MatcherAssert.assertThat(asObject.toJsonString(), Matchers.is(new JsonObject(asObject.getMap()).toString()));
        MatcherAssert.assertThat(asObject.toString(), Matchers.is(asObject.toJsonString()));
    }

    private static List<KsqlArray> convertToClientRows(Multimap<GenericKey, GenericRow> multimap) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : multimap.entries()) {
            KsqlArray ksqlArray = new KsqlArray();
            addObjectToKsqlArray(ksqlArray, ((GenericKey) entry.getKey()).get(0));
            Iterator it = ((GenericRow) entry.getValue()).values().iterator();
            while (it.hasNext()) {
                addObjectToKsqlArray(ksqlArray, it.next());
            }
            addObjectToKsqlArray(ksqlArray, new byte[]{23});
            arrayList.add(ksqlArray);
        }
        return arrayList;
    }

    private static void addObjectToKsqlArray(KsqlArray ksqlArray, Object obj) {
        if (obj instanceof Struct) {
            ksqlArray.add(StructuredTypesDataProvider.structToMap((Struct) obj));
            return;
        }
        if (obj instanceof BigDecimal) {
            ksqlArray.addAll(new KsqlArray(Collections.singletonList(obj)));
            return;
        }
        if (obj instanceof Timestamp) {
            ksqlArray.add(SqlTimeTypes.formatTimestamp((Timestamp) obj));
            return;
        }
        if (obj instanceof Date) {
            ksqlArray.add(SqlTimeTypes.formatDate((Date) obj));
            return;
        }
        if (obj instanceof Time) {
            ksqlArray.add(SqlTimeTypes.formatTime((Time) obj));
        } else if (obj instanceof byte[]) {
            ksqlArray.add(serializeVertX3CompatibleByte((byte[]) obj));
        } else {
            ksqlArray.add(obj);
        }
    }

    private static String serializeVertX3CompatibleByte(byte[] bArr) {
        try {
            String writeValueAsString = ApiJsonMapper.INSTANCE.get().writeValueAsString(bArr);
            return writeValueAsString.substring(EVENT_LOOP_POOL_SIZE, writeValueAsString.length() - EVENT_LOOP_POOL_SIZE);
        } catch (JsonProcessingException e) {
            throw new KsqlException(e);
        }
    }

    private void assertExpectedScalablePushQueries(int i) {
        AssertEventually.assertThatEventually(() -> {
            for (PersistentQueryMetadata persistentQueryMetadata : REST_APP.getEngine().getPersistentQueries()) {
                if (((ScalablePushRegistry) persistentQueryMetadata.getScalablePushRegistry().get()).latestNumRegistered() < i || !((ScalablePushRegistry) persistentQueryMetadata.getScalablePushRegistry().get()).latestHasAssignment()) {
                    return false;
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return true;
        }, Matchers.is(true));
    }

    private void assertAllPersistentQueriesRunning() {
        AssertEventually.assertThatEventually("persistent queries check", () -> {
            Iterator it = REST_APP.getEngine().getPersistentQueries().iterator();
            while (it.hasNext()) {
                if (((PersistentQueryMetadata) it.next()).getState() != KafkaStreams.State.RUNNING) {
                    return false;
                }
            }
            return true;
        }, Matchers.is(true), 60000L, TimeUnit.MILLISECONDS);
    }

    static {
        IntegrationTestHarness integrationTestHarness = TEST_HARNESS;
        integrationTestHarness.getClass();
        REST_APP = TestKsqlRestApp.builder(integrationTestHarness::kafkaBootstrapServers).withProperty("ksql.streams.num.stream.threads", Integer.valueOf(EVENT_LOOP_POOL_SIZE)).withProperty("ksql.persistence.default.format.key", "JSON").withProperty("ksql.verticle.instances", Integer.valueOf(EVENT_LOOP_POOL_SIZE)).withProperty("ksql.worker.pool.size", Integer.valueOf(WORKER_POOL_SIZE)).withProperty("ksql.headers.columns.enabled", true).withProperty("listeners", "http://localhost:8088").withProperty("ksql.query.push.v2.new.latest.delay.ms", 0L).withProperty("ksql.query.push.v2.enabled", true).withProperty("ksql.query.push.v2.registry.installed", true).withProperty("ksql.query.push.v2.continuation.tokens.enabled", true).withProperty("ksql.streams.max.poll.interval.ms", 5000).withProperty("ksql.streams.session.timeout.ms", 10000).withProperty("ksql.streams.auto.offset.reset", "latest").build();
        CHAIN = RuleChain.outerRule(Retry.of(3, ZooKeeperClientException.class, 3L, TimeUnit.SECONDS)).around(TEST_HARNESS).around(REST_APP);
    }
}
