package io.confluent.kafka.schemaregistry.validator.integration;

import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import io.confluent.kafka.schemaregistry.validator.GarbageKafkaAvroSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/validator/integration/AlterConfigsTest.class */
public class AlterConfigsTest extends RecordSchemaValidatorIntegrationTestHarness {
    private static final int PRODUCE_RESULT_TIMEOUT_MS = 5000;

    public AlterConfigsTest() {
        super(1, true, CompatibilityLevel.NONE.name);
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        createTopicThroughAdminClient(new NewTopic("test", 1, (short) 1));
    }

    private void modifyTopicConfigs(boolean z, boolean z2, String str, String str2) throws Exception {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test");
        LinkedList linkedList = new LinkedList();
        linkedList.add(new AlterConfigOp(new ConfigEntry("confluent.key.schema.validation", Boolean.toString(z)), AlterConfigOp.OpType.SET));
        linkedList.add(new AlterConfigOp(new ConfigEntry("confluent.value.schema.validation", Boolean.toString(z2)), AlterConfigOp.OpType.SET));
        if (str != null) {
            linkedList.add(new AlterConfigOp(new ConfigEntry("confluent.key.subject.name.strategy", str), AlterConfigOp.OpType.SET));
        }
        if (str2 != null) {
            linkedList.add(new AlterConfigOp(new ConfigEntry("confluent.value.subject.name.strategy", str2), AlterConfigOp.OpType.SET));
        }
        AlterConfigsResult incrementalAlterConfigs = this.adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, linkedList));
        Assert.assertEquals(Collections.singleton(configResource), incrementalAlterConfigs.values().keySet());
        ((KafkaFuture) incrementalAlterConfigs.values().get(configResource)).get();
        Map map = (Map) this.adminClient.describeConfigs(Collections.singletonList(configResource)).all().get();
        Assert.assertEquals(1L, map.size());
        Assert.assertEquals(Boolean.toString(z), ((Config) map.get(configResource)).get("confluent.key.schema.validation").value());
        Assert.assertEquals(Boolean.toString(z2), ((Config) map.get(configResource)).get("confluent.value.schema.validation").value());
        if (str != null) {
            Assert.assertEquals(str, ((Config) map.get(configResource)).get("confluent.key.subject.name.strategy").value());
        }
        if (str2 != null) {
            Assert.assertEquals(str2, ((Config) map.get(configResource)).get("confluent.value.subject.name.strategy").value());
        }
    }

    private boolean getProduceResult(KafkaProducer<Object, Object> kafkaProducer) {
        try {
            kafkaProducer.send(new ProducerRecord("test", this.mockAvroRecord, this.mockAvroRecord)).get();
            return true;
        } catch (Exception e) {
            Assert.assertTrue(e.getCause() instanceof InvalidRecordException);
            return false;
        }
    }

    @Test
    public void testAlterConfigsRequestEnablesSchemaValidation() throws Exception {
        checkRegisterSubjectThroughRestClient(this.restApp, "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "testSubject");
        KafkaProducer<Object, Object> createProducer = createProducer(GarbageKafkaAvroSerializer.class, GarbageKafkaAvroSerializer.class);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(getProduceResult(createProducer));
        }, 5000L, "The future is supposed to succeed");
        modifyTopicConfigs(true, true, null, null);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(!getProduceResult(createProducer));
        }, 5000L, "The future is supposed to fail");
        checkMetricsAreRecorded();
        modifyTopicConfigs(false, false, null, null);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(getProduceResult(createProducer));
        }, 5000L, "The future is supposed to succeed");
    }

    @Test
    public void testAlterConfigsRequestSubjectNameStrategyChange() throws Exception {
        checkRegisterSubjectThroughRestClient(this.restApp, "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "testSubject");
        KafkaProducer<Object, Object> createProducer = createProducer(GarbageKafkaAvroSerializer.class, GarbageKafkaAvroSerializer.class);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(getProduceResult(createProducer));
        }, 5000L, "The future is supposed to succeed");
        modifyTopicConfigs(true, true, RecordNameStrategy.class.getName(), null);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(!getProduceResult(createProducer));
        }, 5000L, "The future is supposed to fail");
        checkMetricsAreRecorded();
        modifyTopicConfigs(false, false, null, TopicRecordNameStrategy.class.getName());
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(getProduceResult(createProducer));
        }, 5000L, "The future is supposed to succeed");
        checkMetricsAreRecorded();
        modifyTopicConfigs(false, false, null, RecordNameStrategy.class.getName());
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(getProduceResult(createProducer));
        }, 5000L, "The future is supposed to succeed");
        checkMetricsAreRecorded();
        modifyTopicConfigs(false, false, TopicNameStrategy.class.getName(), RecordNameStrategy.class.getName());
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(getProduceResult(createProducer));
        }, 5000L, "The future is supposed to succeed");
        checkMetricsAreRecorded();
    }

    @Test
    public void testAlterConfigsRequestSetsSchemaRegistryUrlFailed() {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(((KafkaServer) this.servers.get(0)).config().brokerId()));
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.schema.registry.url", "mock://scope");
        hashMap.put("confluent.schema.registry.max.cache.size", "5000");
        hashMap.put("confluent.schema.registry.max.retries", "3");
        hashMap.put("confluent.schema.registry.retries.wait.ms", "300");
        for (Map.Entry entry : hashMap.entrySet()) {
            AlterConfigsResult incrementalAlterConfigs = this.adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()), AlterConfigOp.OpType.SET))));
            Assert.assertEquals(Collections.singleton(configResource), incrementalAlterConfigs.values().keySet());
            kafka.utils.TestUtils.assertFutureExceptionTypeEquals((Future) incrementalAlterConfigs.values().get(configResource), InvalidRequestException.class, Option.apply("Cannot update these configs dynamically"));
        }
    }

    @Test
    public void testDescribeTopicConfigsShouldNotIncludeBrokerConfigs() throws Exception {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test");
        Set set = (Set) ((Config) ((Map) this.adminClient.describeConfigs(Collections.singletonList(configResource)).all().get()).get(configResource)).entries().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toSet());
        Assert.assertFalse(set.contains("confluent.schema.registry.url"));
        Assert.assertFalse(set.contains("confluent.schema.registry.max.cache.size"));
        Assert.assertFalse(set.contains("confluent.schema.registry.max.retries"));
        Assert.assertFalse(set.contains("confluent.schema.registry.retries.wait.ms"));
    }
}
