package io.confluent.ksql.api.client.integration;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.vertx.core.Vertx;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.zookeeper.ZooKeeperClientException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/ksql/api/client/integration/AssertClientIntegrationTest.class */
public class AssertClientIntegrationTest {
    private static final PhysicalSchema SCHEMA = PhysicalSchema.from(LogicalSchema.builder().keyColumn(ColumnName.of("K"), SqlTypes.INTEGER).valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT).build(), SerdeFeatures.of(new SerdeFeature[0]), SerdeFeatures.of(new SerdeFeature[0]));
    private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
    private static final TestKsqlRestApp REST_APP;

    @ClassRule
    public static final RuleChain CHAIN;
    private Vertx vertx;
    private Client client;

    @BeforeClass
    public static void setUpClass() {
        TEST_HARNESS.ensureTopics(new String[]{"abc"});
        TEST_HARNESS.ensureSchema("abc", SCHEMA, false);
    }

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.client = createClient();
    }

    @After
    public void tearDown() {
        if (this.client != null) {
            this.client.close();
        }
        if (this.vertx != null) {
            this.vertx.close();
        }
        REST_APP.getServiceContext().close();
    }

    @Test
    public void shouldThrowOnAssertNonexistentSchema() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 417 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Schema with subject name NONEXISTENT id 3 does not exist"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 41700"));
    }

    @Test
    public void shouldThrowOnAssertNotExistsNonExistingSchema() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 417 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Schema with subject name abc-value id 1 exists."));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 41700"));
    }

    @Test
    public void shouldAssertSchema() throws ExecutionException, InterruptedException {
        this.client.assertSchema("abc-value", true).get();
        this.client.assertSchema("abc-value", 1, true).get();
        this.client.assertSchema(1, true, Duration.ofSeconds(3L)).get();
    }

    @Test
    public void shouldAssertNotExistsSchema() throws ExecutionException, InterruptedException {
        this.client.assertSchema(34, false).get();
        this.client.assertSchema("NONEXISTENT", false, Duration.ofSeconds(3L)).get();
        this.client.assertSchema("NONEXISTENT", 43, false, Duration.ofSeconds(3L)).get();
    }

    @Test
    public void shouldThrowOnAssertNonexistentTopic() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        Exception exc2 = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 417 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Topic NONEXISTENT does not exist."));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 41700"));
        MatcherAssert.assertThat(exc2.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Received 417 response from server"));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Mismatched configuration for topic abc: For config partitions, expected 7 got 1"));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Cannot assert unknown topic property: FOO."));
        MatcherAssert.assertThat(exc2.getCause().getMessage(), Matchers.containsString("Error code: 41700"));
    }

    @Test
    public void shouldThrowOnAssertNotExistsNonExistingTopic() {
        Exception exc = (Exception) Assert.assertThrows(ExecutionException.class, () -> {
        });
        MatcherAssert.assertThat(exc.getCause(), Matchers.instanceOf(KsqlClientException.class));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Received 417 response from server"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Topic abc exists"));
        MatcherAssert.assertThat(exc.getCause().getMessage(), Matchers.containsString("Error code: 41700"));
    }

    @Test
    public void shouldAssertTopic() throws ExecutionException, InterruptedException {
        this.client.define("name", "abc");
        this.client.assertTopic("abc", true).get();
        this.client.assertTopic("${name}", ImmutableMap.of("replicas", 1, "partitions", 1), true, Duration.ofSeconds(3L)).get();
    }

    @Test
    public void shouldAssertNotExistsTopic() throws ExecutionException, InterruptedException {
        this.client.assertTopic("NONEXISTENT", ImmutableMap.of("replicas", 1, "partitions", 1), false).get();
        this.client.assertTopic("NONEXISTENT", false, Duration.ofSeconds(3L)).get();
    }

    private Client createClient() {
        return Client.create(ClientOptions.create().setHost("localhost").setPort(((URL) REST_APP.getListeners().get(0)).getPort()), this.vertx);
    }

    static {
        IntegrationTestHarness integrationTestHarness = TEST_HARNESS;
        integrationTestHarness.getClass();
        TestKsqlRestApp.Builder withProperty = TestKsqlRestApp.builder(integrationTestHarness::kafkaBootstrapServers).withProperty("ksql.streams.num.stream.threads", 1).withProperty("ksql.persistence.default.format.key", "JSON").withProperty("ksql.headers.columns.enabled", true);
        IntegrationTestHarness integrationTestHarness2 = TEST_HARNESS;
        integrationTestHarness2.getClass();
        REST_APP = withProperty.withStaticServiceContext(integrationTestHarness2::getServiceContext).withProperty("ksql.schema.registry.url", "http://foo:8080").build();
        CHAIN = RuleChain.outerRule(Retry.of(3, ZooKeeperClientException.class, 3L, TimeUnit.SECONDS)).around(TEST_HARNESS).around(REST_APP);
    }
}
