package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.class */
public class StreamsKafkaClientTest {
    private static final String TOPIC = "topic";
    private final MockClient kafkaClient = new MockClient(new MockTime());
    private final List<MetricsReporter> reporters = Collections.emptyList();
    private final MetadataResponse metadata = new MetadataResponse(Collections.singletonList(new Node(1, "host", 90)), "cluster", 1, Collections.emptyList());
    private final Map<String, Object> config = new HashMap();
    private final InternalTopicConfig topicConfigWithNoOverrides = new InternalTopicConfig(TOPIC, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.emptyMap());
    private final Map<String, String> overridenTopicConfig = Collections.singletonMap("delete.retention.ms", "100");
    private final InternalTopicConfig topicConfigWithOverrides = new InternalTopicConfig(TOPIC, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), this.overridenTopicConfig);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest$TestMetricsReporter.class */
    public static class TestMetricsReporter implements MetricsReporter {
        static final Map<MetricName, KafkaMetric> METRICS = new HashMap();

        public void configure(Map<String, ?> map) {
        }

        public void init(List<KafkaMetric> list) {
            Iterator<KafkaMetric> it = list.iterator();
            while (it.hasNext()) {
                metricChange(it.next());
            }
        }

        public void metricChange(KafkaMetric kafkaMetric) {
            METRICS.put(kafkaMetric.metricName(), kafkaMetric);
        }

        public void metricRemoval(KafkaMetric kafkaMetric) {
            METRICS.remove(kafkaMetric.metricName());
        }

        public void close() {
            METRICS.clear();
        }
    }

    @Before
    public void before() {
        this.config.put("application.id", "some_app_id");
        this.config.put("bootstrap.servers", "localhost:9000");
    }

    @Test
    public void testConfigFromStreamsConfig() {
        for (String str : Arrays.asList("PLAIN", "SCRAM-SHA-512")) {
            this.config.put("sasl.mechanism", str);
            StreamsKafkaClient.Config fromStreamsConfig = StreamsKafkaClient.Config.fromStreamsConfig(new StreamsConfig(this.config));
            Assert.assertEquals(str, fromStreamsConfig.values().get("sasl.mechanism"));
            Assert.assertEquals(str, fromStreamsConfig.getString("sasl.mechanism"));
        }
    }

    @Test
    public void shouldAddCleanupPolicyToTopicConfigWhenCreatingTopic() throws Exception {
        verifyCorrectTopicConfigs(createStreamsKafkaClient(), this.topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
    }

    @Test
    public void shouldAddDefaultTopicConfigFromStreamConfig() throws Exception {
        this.config.put(StreamsConfig.topicPrefix("segment.ms"), "100");
        this.config.put(StreamsConfig.topicPrefix("compression.type"), "gzip");
        HashMap hashMap = new HashMap();
        hashMap.put("segment.ms", "100");
        hashMap.put("compression.type", "gzip");
        hashMap.put("cleanup.policy", "delete");
        verifyCorrectTopicConfigs(createStreamsKafkaClient(), this.topicConfigWithNoOverrides, hashMap);
    }

    @Test
    public void shouldSetPropertiesDefinedByInternalTopicConfig() throws Exception {
        HashMap hashMap = new HashMap(this.overridenTopicConfig);
        hashMap.put("cleanup.policy", "compact");
        verifyCorrectTopicConfigs(createStreamsKafkaClient(), this.topicConfigWithOverrides, hashMap);
    }

    @Test
    public void shouldOverrideDefaultTopicConfigsFromStreamsConfig() throws Exception {
        this.config.put(StreamsConfig.topicPrefix("delete.retention.ms"), "99999");
        this.config.put(StreamsConfig.topicPrefix("segment.ms"), "988");
        HashMap hashMap = new HashMap(this.overridenTopicConfig);
        hashMap.put("cleanup.policy", "compact");
        hashMap.put("delete.retention.ms", "100");
        hashMap.put("segment.ms", "988");
        verifyCorrectTopicConfigs(createStreamsKafkaClient(), this.topicConfigWithOverrides, hashMap);
    }

    @Test
    public void shouldNotAllowNullTopicConfigs() throws Exception {
        this.config.put(StreamsConfig.topicPrefix("delete.retention.ms"), null);
        verifyCorrectTopicConfigs(createStreamsKafkaClient(), this.topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
    }

    @Test
    public void metricsShouldBeTaggedWithClientId() {
        this.config.put("client.id", "some_client_id");
        this.config.put("metric.reporters", TestMetricsReporter.class.getName());
        StreamsKafkaClient.create(new StreamsConfig(this.config));
        Assert.assertFalse(TestMetricsReporter.METRICS.isEmpty());
        Iterator<KafkaMetric> it = TestMetricsReporter.METRICS.values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals("some_client_id", it.next().metricName().tags().get("client-id"));
        }
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() {
        this.kafkaClient.prepareResponse((AbstractResponse) null);
        createStreamsKafkaClient().checkBrokerCompatibility(false);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionWhenBrokerCompatibilityResponseInconsistent() {
        this.kafkaClient.prepareResponse(new ProduceResponse(Collections.emptyMap()));
        createStreamsKafkaClient().checkBrokerCompatibility(false);
    }

    @Test(expected = StreamsException.class)
    public void shouldRequireBrokerVersion0101OrHigherWhenEosDisabled() {
        this.kafkaClient.prepareResponse(new ApiVersionsResponse(Errors.NONE, Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE))));
        createStreamsKafkaClient().checkBrokerCompatibility(false);
    }

    @Test(expected = StreamsException.class)
    public void shouldRequireBrokerVersions0110OrHigherWhenEosEnabled() {
        this.kafkaClient.prepareResponse(new ApiVersionsResponse(Errors.NONE, Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.CREATE_TOPICS))));
        createStreamsKafkaClient().checkBrokerCompatibility(true);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionOnEmptyFetchMetadataResponse() {
        this.kafkaClient.prepareResponse((AbstractResponse) null);
        createStreamsKafkaClient().fetchMetadata();
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionWhenFetchMetadataResponseInconsistent() {
        this.kafkaClient.prepareResponse(new ProduceResponse(Collections.emptyMap()));
        createStreamsKafkaClient().fetchMetadata();
    }

    private void verifyCorrectTopicConfigs(StreamsKafkaClient streamsKafkaClient, InternalTopicConfig internalTopicConfig, Map<String, String> map) {
        final HashMap hashMap = new HashMap();
        this.kafkaClient.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.streams.processor.internals.StreamsKafkaClientTest.1
            public boolean matches(AbstractRequest abstractRequest) {
                if (!(abstractRequest instanceof CreateTopicsRequest)) {
                    return false;
                }
                hashMap.putAll(((CreateTopicsRequest.TopicDetails) ((CreateTopicsRequest) abstractRequest).topics().get(StreamsKafkaClientTest.TOPIC)).configs);
                return true;
            }
        }, new CreateTopicsResponse(Collections.singletonMap(TOPIC, ApiError.NONE)));
        streamsKafkaClient.createTopics(Collections.singletonMap(internalTopicConfig, 1), 1, 1L, this.metadata);
        Assert.assertThat(hashMap, CoreMatchers.equalTo(map));
    }

    private StreamsKafkaClient createStreamsKafkaClient() {
        return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(new StreamsConfig(this.config)), this.kafkaClient, this.reporters);
    }
}
