/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.validator.integration;

import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import io.confluent.kafka.schemaregistry.validator.GarbageKafkaAvroSerializer;
import io.confluent.kafka.schemaregistry.validator.integration.RecordSchemaValidatorIntegrationTestHarness;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class AlterConfigsTest
extends RecordSchemaValidatorIntegrationTestHarness {
    private static final int PRODUCE_RESULT_TIMEOUT_MS = 5000;

    public AlterConfigsTest() {
        super(3, true, CompatibilityLevel.NONE.name);
    }

    protected void setUp() throws Exception {
        super.setUp();
        NewTopic testTopic = new NewTopic("test", 1, 1);
        this.createTopicThroughAdminClient(testTopic);
    }

    private void modifyTopicConfigs(boolean keyValidation, boolean valueValidation, String keySubjectNameStrategy, String valueSubjectNameStrategy) throws Exception {
        ConfigResource testTopicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test");
        LinkedList<AlterConfigOp> testTopicConfigs = new LinkedList<AlterConfigOp>();
        testTopicConfigs.add(new AlterConfigOp(new ConfigEntry("confluent.key.schema.validation", Boolean.toString(keyValidation)), AlterConfigOp.OpType.SET));
        testTopicConfigs.add(new AlterConfigOp(new ConfigEntry("confluent.value.schema.validation", Boolean.toString(valueValidation)), AlterConfigOp.OpType.SET));
        if (keySubjectNameStrategy != null) {
            testTopicConfigs.add(new AlterConfigOp(new ConfigEntry("confluent.key.subject.name.strategy", keySubjectNameStrategy), AlterConfigOp.OpType.SET));
        }
        if (valueSubjectNameStrategy != null) {
            testTopicConfigs.add(new AlterConfigOp(new ConfigEntry("confluent.value.subject.name.strategy", valueSubjectNameStrategy), AlterConfigOp.OpType.SET));
        }
        AlterConfigsResult alterResult = this.adminClient.incrementalAlterConfigs(Collections.singletonMap(testTopicResource, testTopicConfigs));
        Assertions.assertEquals(Collections.singleton(testTopicResource), alterResult.values().keySet());
        ((KafkaFuture)alterResult.values().get(testTopicResource)).get();
        Thread.sleep(500L);
        DescribeConfigsResult describeResult = this.adminClient.describeConfigs(Collections.singletonList(testTopicResource));
        Map configs = (Map)describeResult.all().get();
        Assertions.assertEquals((int)1, (int)configs.size());
        Assertions.assertEquals((Object)Boolean.toString(keyValidation), (Object)((Config)configs.get(testTopicResource)).get("confluent.key.schema.validation").value());
        Assertions.assertEquals((Object)Boolean.toString(valueValidation), (Object)((Config)configs.get(testTopicResource)).get("confluent.value.schema.validation").value());
        if (keySubjectNameStrategy != null) {
            Assertions.assertEquals((Object)keySubjectNameStrategy, (Object)((Config)configs.get(testTopicResource)).get("confluent.key.subject.name.strategy").value());
        }
        if (valueSubjectNameStrategy != null) {
            Assertions.assertEquals((Object)valueSubjectNameStrategy, (Object)((Config)configs.get(testTopicResource)).get("confluent.value.subject.name.strategy").value());
        }
    }

    private boolean getProduceResult(KafkaProducer<Object, Object> kafkaProducer) {
        try {
            ProducerRecord message = new ProducerRecord("test", (Object)this.mockAvroRecord, (Object)this.mockAvroRecord);
            kafkaProducer.send(message).get();
            return true;
        }
        catch (Exception e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof InvalidRecordException));
            return false;
        }
    }

    @Test
    public void testAlterConfigsRequestEnablesSchemaValidation() throws Exception {
        this.checkRegisterSubjectThroughRestClient(this.restApp, "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "testSubject");
        KafkaProducer<Object, Object> kafkaProducer = this.createProducer(GarbageKafkaAvroSerializer.class, GarbageKafkaAvroSerializer.class);
        TestUtils.waitUntilTrue(() -> this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to succeed");
        this.modifyTopicConfigs(true, true, null, null);
        TestUtils.waitUntilTrue(() -> !this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to fail");
        this.checkMetricsAreRecorded();
        this.modifyTopicConfigs(false, false, null, null);
        TestUtils.waitUntilTrue(() -> this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to succeed");
    }

    @Test
    public void testAlterConfigsRequestSubjectNameStrategyChange() throws Exception {
        this.checkRegisterSubjectThroughRestClient(this.restApp, "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "testSubject");
        KafkaProducer<Object, Object> kafkaProducer = this.createProducer(GarbageKafkaAvroSerializer.class, GarbageKafkaAvroSerializer.class);
        TestUtils.waitUntilTrue(() -> this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to succeed");
        this.modifyTopicConfigs(true, true, RecordNameStrategy.class.getName(), null);
        TestUtils.waitUntilTrue(() -> !this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to fail");
        this.checkMetricsAreRecorded();
        this.modifyTopicConfigs(false, false, null, TopicRecordNameStrategy.class.getName());
        TestUtils.waitUntilTrue(() -> this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to succeed");
        this.checkMetricsAreRecorded();
        this.modifyTopicConfigs(false, false, null, RecordNameStrategy.class.getName());
        TestUtils.waitUntilTrue(() -> this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to succeed");
        this.checkMetricsAreRecorded();
        this.modifyTopicConfigs(false, false, TopicNameStrategy.class.getName(), RecordNameStrategy.class.getName());
        TestUtils.waitUntilTrue(() -> this.getProduceResult(kafkaProducer), (long)5000L, (String)"The future is supposed to succeed");
        this.checkMetricsAreRecorded();
    }

    @Test
    public void testAlterConfigsRequestSetsSchemaRegistryUrlFailed() {
        ConfigResource testTopicResource = new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(((KafkaBroker)this.servers.get(0)).config().brokerId()));
        HashMap<String, String> invalidConfigs = new HashMap<String, String>();
        invalidConfigs.put("confluent.schema.registry.url", "mock://scope");
        invalidConfigs.put("confluent.schema.registry.max.cache.size", "5000");
        invalidConfigs.put("confluent.schema.registry.max.retries", "3");
        invalidConfigs.put("confluent.schema.registry.retries.wait.ms", "300");
        for (Map.Entry configEntry : invalidConfigs.entrySet()) {
            AlterConfigsResult alterResult = this.adminClient.incrementalAlterConfigs(Collections.singletonMap(testTopicResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry((String)configEntry.getKey(), (String)configEntry.getValue()), AlterConfigOp.OpType.SET))));
            Assertions.assertEquals(Collections.singleton(testTopicResource), alterResult.values().keySet());
            Throwable cause = ((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((KafkaFuture)alterResult.values().get(testTopicResource)).get())).getCause();
            Assertions.assertEquals(InvalidRequestException.class, cause.getClass());
            String expectedErrorMessage = "Cannot update these configs dynamically";
            Assertions.assertTrue((boolean)cause.getMessage().contains(expectedErrorMessage), (String)("Received error message : '" + cause.getMessage() + "' does not contain expected error message : '" + expectedErrorMessage + "'"));
        }
    }

    @Test
    public void testDescribeTopicConfigsShouldNotIncludeBrokerConfigs() throws Exception {
        ConfigResource testTopicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test");
        DescribeConfigsResult describeResult = this.adminClient.describeConfigs(Collections.singletonList(testTopicResource));
        Set configSet = ((Config)((Map)describeResult.all().get()).get(testTopicResource)).entries().stream().map(ConfigEntry::name).collect(Collectors.toSet());
        Assertions.assertFalse((boolean)configSet.contains("confluent.schema.registry.url"));
        Assertions.assertFalse((boolean)configSet.contains("confluent.schema.registry.max.cache.size"));
        Assertions.assertFalse((boolean)configSet.contains("confluent.schema.registry.max.retries"));
        Assertions.assertFalse((boolean)configSet.contains("confluent.schema.registry.retries.wait.ms"));
    }
}

