package io.confluent.ksql.api.client.integration;

import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.confluent.ksql.util.TestDataProvider;
import io.vertx.core.Vertx;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/ksql/api/client/integration/RestClientIntegrationTest.class */
public class RestClientIntegrationTest {
    private static final String AGG_TABLE = "AGG_TABLE";
    private static final int EVENT_LOOP_POOL_SIZE = 1;
    private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5;
    private static final int WORKER_POOL_SIZE = 10;
    private static final TestKsqlRestApp REST_APP;

    @ClassRule
    public static final RuleChain CHAIN;
    private Vertx vertx;
    private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = new StructuredTypesDataProvider();
    private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName();
    private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName();
    private static final Format KEY_FORMAT = FormatFactory.JSON;
    private static final Format VALUE_FORMAT = FormatFactory.JSON;
    private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from(LogicalSchema.builder().keyColumn(ColumnName.of("K"), SqlTypes.struct().field("F1", SqlTypes.array(SqlTypes.STRING)).build()).valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT).build(), SerdeFeatures.of(new SerdeFeature[]{SerdeFeature.UNWRAP_SINGLES}), SerdeFeatures.of(new SerdeFeature[0]));
    private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER = new TestDataProvider("EMPTY_STRUCTURED_TYPES", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of());
    private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName();
    private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider("EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of());
    private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName();
    private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;";
    private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

    @BeforeClass
    public static void setUpClass() throws Exception {
        TEST_HARNESS.ensureTopics(new String[]{TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2});
        TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT);
        RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER);
        RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER);
        RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2);
        makeKsqlRequest("CREATE TABLE AGG_TABLE AS SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;");
        TEST_HARNESS.verifyAvailableUniqueRows(AGG_TABLE, 4, KEY_FORMAT, VALUE_FORMAT, AGG_SCHEMA);
        String path = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString();
        String path2 = Paths.get(path, "connect.properties").toString();
        Files.createDirectories(Paths.get(path, new String[0]), new FileAttribute[0]);
        writeConnectConfigs(path2, ImmutableMap.builder().put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers()).put("group.id", UUID.randomUUID().toString()).put("key.converter", StringConverter.class.getName()).put("value.converter", JsonConverter.class.getName()).put("offset.storage.topic", "connect-offsets").put("status.storage.topic", "connect-status").put("config.storage.topic", "connect-config").put("offset.storage.replication.factor", "1").put("status.storage.replication.factor", "1").put("config.storage.replication.factor", "1").put("value.converter.schemas.enable", "false").build());
    }

    private static void writeConnectConfigs(String str, Map<String, String> map) throws Exception {
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(str, true), StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            try {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    printWriter.println(entry.getKey() + "=" + entry.getValue());
                }
                if (printWriter != null) {
                    if (0 == 0) {
                        printWriter.close();
                        return;
                    }
                    try {
                        printWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (printWriter != null) {
                if (th != null) {
                    try {
                        printWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th4;
        }
    }

    @AfterClass
    public static void classTearDown() {
        REST_APP.getPersistentQueries().forEach(str -> {
            makeKsqlRequest("TERMINATE " + str + ";");
        });
    }

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void tearDown() {
        if (this.vertx != null) {
            this.vertx.close();
        }
        REST_APP.getServiceContext().close();
    }

    @Test(timeout = 120000)
    public void shouldStreamMultiplePushQueriesRest() {
        ArrayList arrayList = new ArrayList(NUM_CONCURRENT_REQUESTS_TO_TEST);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 5) {
                break;
            }
            arrayList.add(REST_APP.buildKsqlClient().makeQueryRequestStreamed(PUSH_QUERY, Long.valueOf(j2)));
            j = j2 + 1;
        }
        MatcherAssert.assertThat(arrayList, Matchers.everyItem(Matchers.hasProperty("successful", Matchers.is(true))));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((StreamPublisher) ((RestResponse) it.next()).getResponse()).close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<KsqlEntity> makeKsqlRequest(String str) {
        return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, str);
    }

    static {
        IntegrationTestHarness integrationTestHarness = TEST_HARNESS;
        integrationTestHarness.getClass();
        REST_APP = TestKsqlRestApp.builder(integrationTestHarness::kafkaBootstrapServers).withProperty("ksql.streams.num.stream.threads", Integer.valueOf(EVENT_LOOP_POOL_SIZE)).withProperty("ksql.persistence.default.format.key", "JSON").withProperty("ksql.verticle.instances", Integer.valueOf(EVENT_LOOP_POOL_SIZE)).withProperty("ksql.worker.pool.size", Integer.valueOf(WORKER_POOL_SIZE)).build();
        CHAIN = RuleChain.outerRule(Retry.of(3, ZooKeeperClientException.class, 3L, TimeUnit.SECONDS)).around(TEST_HARNESS).around(REST_APP);
    }
}
