package org.apache.kafka.connect.integration;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.class */
public class ConnectorTopicsIntegrationTest {
    private static final int NUM_WORKERS = 5;
    private static final int NUM_TASKS = 1;
    private static final String FOO_TOPIC = "foo-topic";
    private static final String FOO_CONNECTOR = "foo-source";
    private static final String BAR_TOPIC = "bar-topic";
    private static final String BAR_CONNECTOR = "bar-source";
    private static final String SINK_CONNECTOR = "baz-sink";
    private static final int NUM_TOPIC_PARTITIONS = 3;
    private EmbeddedConnectCluster.Builder connectBuilder;
    private EmbeddedConnectCluster connect;
    Map<String, String> workerProps = new HashMap();
    Properties brokerProps = new Properties();

    @Before
    public void setup() {
        this.workerProps.put("connector.client.config.override.policy", "All");
        this.brokerProps.put("auto.create.topics.enable", String.valueOf(false));
        this.connectBuilder = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(NUM_WORKERS).workerProps(this.workerProps).brokerProps(this.brokerProps).maskExitProcedures(true);
    }

    @After
    public void close() {
        this.connect.stop();
    }

    @Test
    public void testGetActiveTopics() throws InterruptedException {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: foo-source");
        this.connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, 1, "Connector tasks did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.singletonList(FOO_TOPIC), "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + FOO_CONNECTOR);
        this.connect.configureConnector(BAR_CONNECTOR, defaultSourceConnectorProps(BAR_TOPIC));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(BAR_CONNECTOR, 1, "Connector tasks did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.singletonList(BAR_TOPIC), "Active topic set is not: " + Collections.singletonList(BAR_TOPIC) + " for connector: " + BAR_CONNECTOR);
        this.connect.configureConnector(SINK_CONNECTOR, defaultSinkConnectorProps(FOO_TOPIC, BAR_TOPIC));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, 1, "Connector tasks did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC, BAR_TOPIC), "Active topic set is not: " + Arrays.asList(FOO_TOPIC, BAR_TOPIC) + " for connector: " + SINK_CONNECTOR);
        this.connect.deleteConnector(BAR_CONNECTOR);
        this.connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR, "Connector wasn't deleted in time.");
        this.connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for deleted connector: bar-source");
        Thread.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        this.connect.resetConnectorTopics(SINK_CONNECTOR);
        this.connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC), "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
    }

    @Test
    public void testTopicTrackingResetIsDisabled() throws InterruptedException {
        this.workerProps.put("topic.tracking.allow.reset", "false");
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: foo-source");
        this.connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, 1, "Connector tasks did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.singletonList(FOO_TOPIC), "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + FOO_CONNECTOR);
        this.connect.configureConnector(SINK_CONNECTOR, defaultSinkConnectorProps(FOO_TOPIC));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, 1, "Connector tasks did not start in time.");
        this.connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC), "Active topic set is not: " + Arrays.asList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
        this.connect.deleteConnector(FOO_CONNECTOR);
        this.connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR, "Connector wasn't deleted in time.");
        this.connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for deleted connector: foo-source");
        Thread.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertTrue(((Exception) Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.resetConnectorTopics(SINK_CONNECTOR);
        })).getMessage().contains("Topic tracking reset is disabled."));
        this.connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC), "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
    }

    @Test
    public void testTopicTrackingIsDisabled() throws InterruptedException {
        this.workerProps.put("topic.tracking.enable", "false");
        this.connect = this.connectBuilder.build();
        this.connect.start();
        this.connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
        this.connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
        this.connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, 1, "Connector tasks did not start in time.");
        Assert.assertTrue(((Exception) Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.resetConnectorTopics(SINK_CONNECTOR);
        })).getMessage().contains("Topic tracking is disabled."));
        Assert.assertTrue(((Exception) Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.connectorTopics(SINK_CONNECTOR);
        })).getMessage().contains("Topic tracking is disabled."));
        Thread.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        assertNoTopicStatusInStatusTopic();
    }

    /* JADX WARN: Code restructure failed: missing block: B:16:0x00d4, code lost:
    
        java.util.stream.StreamSupport.stream(r0.poll(java.time.Duration.ofMillis(2147483647L)).spliterator(), false).map((v0) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
            return v0.key();
        }).filter((v0) -> { // java.util.function.Predicate.test(java.lang.Object):boolean
            return java.util.Objects.nonNull(v0);
        }).filter((v0) -> { // java.util.function.Predicate.test(java.lang.Object):boolean
            return lambda$assertNoTopicStatusInStatusTopic$4(v0);
        }).findFirst().ifPresent((v0) -> { // java.util.function.Consumer.accept(java.lang.Object):void
            lambda$assertNoTopicStatusInStatusTopic$5(v0);
        });
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void assertNoTopicStatusInStatusTopic() {
        /*
            Method dump skipped, instructions count: 298
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest.assertNoTopicStatusInStatusTopic():void");
    }

    private Map<String, String> defaultSourceConnectorProps(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, str);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, String.valueOf(10));
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, String.valueOf(10));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        return hashMap;
    }

    private Map<String, String> defaultSinkConnectorProps(String... strArr) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", String.join(",", strArr));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }
}
