package org.apache.kafka.connect.integration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorTracingIntegrationTest.class */
public class ConnectorTracingIntegrationTest {
    private EmbeddedConnectCluster connectCluster;
    private static final long RECORDS_TIMEOUT = 30000;
    private static final long COMMIT_TIMEOUT = 30000;
    private static final long CONSUME_TIMEOUT = 30000;
    private static final Integer TEST_RECORD_COUNT = 2000;
    private static final long CONNECTOR_SETUP_DURATION_MS = 60000;

    @Before
    public void setup() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.client.config.override.policy", "All");
        hashMap.put("offset.flush.interval.ms", String.valueOf(5000L));
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        this.connectCluster = new EmbeddedConnectCluster.Builder().name("testCluster").workerProps(hashMap).brokerProps(properties).maskExitProcedures(true).build();
        this.connectCluster.start();
    }

    @After
    public void teardown() {
        this.connectCluster.stop();
    }

    private Map<String, String> regexTransform(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(str, "topicChange");
        hashMap.put(str + ".topicChange.type", "org.apache.kafka.connect.transforms.RegexRouter");
        hashMap.put(str + ".topicChange.regex", ".*");
        hashMap.put(str + ".topicChange.replacement", str2);
        return hashMap;
    }

    private Map<String, String> defaultSourceConnectorConfigs(String str, Integer num) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(num));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "pre-transformed-connect-topic");
        hashMap.put("throughput", String.valueOf(500));
        hashMap.put("key.converter", JsonConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.putAll(regexTransform("transforms", str));
        return hashMap;
    }

    private Map<String, String> addTraceConfigs(Map<String, String> map, String str) {
        map.put("trace.records.topic", "pre-transformed-trace-topic");
        map.put("trace.records.enable", "true");
        map.put("trace.records.topic.partition", "1");
        map.put("trace.records.topic.replication.factor", "1");
        map.put("trace.records.value.converter", JsonConverter.class.getName());
        map.put("trace.records.key.converter", JsonConverter.class.getName());
        map.put("trace.records.header.converter", JsonConverter.class.getName());
        map.putAll(regexTransform("trace.records.transforms", str));
        return map;
    }

    @Test
    public void testSourceConnectorWithConnectorTracing() throws InterruptedException {
        this.connectCluster.assertions().assertAtLeastNumWorkersAreUp(1, "Failed to start a connect worker");
        this.connectCluster.kafka().createTopic("ConnectorSourceTopic");
        this.connectCluster.assertions().assertTopicsExist("ConnectorSourceTopic");
        Map<String, String> addTraceConfigs = addTraceConfigs(defaultSourceConnectorConfigs("ConnectorSourceTopic", 2), "ConnectorTraceTopic");
        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("TracedSourceConnector");
        connectorHandle.expectedCommits(TEST_RECORD_COUNT.intValue());
        connectorHandle.expectedRecords(TEST_RECORD_COUNT.intValue());
        this.connectCluster.configureConnector("TracedSourceConnector", addTraceConfigs);
        this.connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning("TracedSourceConnector", 2, "Failed to start connector with 2 required tasks");
        connectorHandle.awaitCommits(30000L);
        connectorHandle.awaitRecords(30000L);
        this.connectCluster.deleteConnector("TracedSourceConnector");
        this.connectCluster.assertions().assertConnectorAndTasksAreStopped("TracedSourceConnector", "Failed to stop connector");
        this.connectCluster.assertions().assertTopicsExist("ConnectorTraceTopic");
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorSourceTopic");
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorTraceTopic");
    }

    @Test
    public void testSourceConnectorWithConnectorTracingWithSchemaEnable() throws InterruptedException {
        this.connectCluster.assertions().assertAtLeastNumWorkersAreUp(1, "Failed to start a connect worker");
        this.connectCluster.kafka().createTopic("ConnectorSourceTopic");
        this.connectCluster.assertions().assertTopicsExist("ConnectorSourceTopic");
        Map<String, String> addTraceConfigs = addTraceConfigs(defaultSourceConnectorConfigs("ConnectorSourceTopic", 2), "ConnectorTraceTopic");
        addTraceConfigs.put("trace.records.key.converter.schemas.enable", "true");
        addTraceConfigs.put("trace.records.value.converter.schemas.enable", "true");
        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("TracedSourceConnector");
        connectorHandle.expectedCommits(TEST_RECORD_COUNT.intValue());
        connectorHandle.expectedRecords(TEST_RECORD_COUNT.intValue());
        this.connectCluster.configureConnector("TracedSourceConnector", addTraceConfigs);
        this.connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning("TracedSourceConnector", 2, "Failed to start connector with 2 required tasks");
        connectorHandle.awaitCommits(30000L);
        connectorHandle.awaitRecords(30000L);
        this.connectCluster.deleteConnector("TracedSourceConnector");
        this.connectCluster.assertions().assertConnectorAndTasksAreStopped("TracedSourceConnector", "Failed to stop connector");
        this.connectCluster.assertions().assertTopicsExist("ConnectorTraceTopic");
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorSourceTopic");
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorTraceTopic");
    }

    private Map<String, String> defaultSinkConnectorConfigs(String str, Integer num) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getName());
        hashMap.put("tasks.max", num.toString());
        hashMap.put("topics", str);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.putAll(regexTransform("transforms", str));
        return hashMap;
    }

    @Test
    public void testSinkConnectorWithConnectorTracing() throws InterruptedException {
        this.connectCluster.assertions().assertAtLeastNumWorkersAreUp(1, "Failed to start a connect worker");
        this.connectCluster.kafka().createTopic("ConnectorSinkTopic", 2);
        this.connectCluster.assertions().assertTopicsExist("ConnectorSinkTopic");
        Map<String, String> addTraceConfigs = addTraceConfigs(defaultSinkConnectorConfigs("ConnectorSinkTopic", 2), "ConnectorTraceTopic");
        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("TracedSinkConnector");
        connectorHandle.expectedCommits(TEST_RECORD_COUNT.intValue());
        connectorHandle.expectedRecords(TEST_RECORD_COUNT.intValue());
        this.connectCluster.configureConnector("TracedSinkConnector", addTraceConfigs);
        TestUtils.waitForCondition(() -> {
            try {
                ConnectorStateInfo connectorStatus = this.connectCluster.connectorStatus("TracedSinkConnector");
                if (connectorStatus != null && connectorStatus.tasks().size() == 2) {
                    if (connectorHandle.tasks().stream().allMatch(taskHandle -> {
                        return taskHandle.numPartitionsAssigned() == 1;
                    })) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                return false;
            }
        }, 60000L, "Failed to assign topic partitions to tasks ");
        for (int i = 0; i < TEST_RECORD_COUNT.intValue(); i++) {
            this.connectCluster.kafka().produce("ConnectorSinkTopic", "key", "TracedSinkConnector-" + i);
        }
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorSinkTopic");
        connectorHandle.awaitRecords(30000L);
        connectorHandle.awaitCommits(30000L);
        this.connectCluster.deleteConnector("TracedSinkConnector");
        this.connectCluster.assertions().assertConnectorAndTasksAreStopped("TracedSinkConnector", "Failed to stop connector");
        this.connectCluster.assertions().assertTopicsExist("ConnectorTraceTopic");
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorTraceTopic");
    }

    @Test
    public void testSinkConnectorWithConnectorTracingWithSchemaEnable() throws InterruptedException {
        this.connectCluster.assertions().assertAtLeastNumWorkersAreUp(1, "Failed to start a connect worker");
        this.connectCluster.kafka().createTopic("ConnectorSinkTopic", 2);
        this.connectCluster.assertions().assertTopicsExist("ConnectorSinkTopic");
        Map<String, String> addTraceConfigs = addTraceConfigs(defaultSinkConnectorConfigs("ConnectorSinkTopic", 2), "ConnectorTraceTopic");
        addTraceConfigs.put("trace.records.key.converter.schemas.enable", "true");
        addTraceConfigs.put("trace.records.value.converter.schemas.enable", "true");
        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("TracedSinkConnector");
        connectorHandle.expectedCommits(TEST_RECORD_COUNT.intValue());
        connectorHandle.expectedRecords(TEST_RECORD_COUNT.intValue());
        this.connectCluster.configureConnector("TracedSinkConnector", addTraceConfigs);
        TestUtils.waitForCondition(() -> {
            try {
                ConnectorStateInfo connectorStatus = this.connectCluster.connectorStatus("TracedSinkConnector");
                if (connectorStatus != null && connectorStatus.tasks().size() == 2) {
                    if (connectorHandle.tasks().stream().allMatch(taskHandle -> {
                        return taskHandle.numPartitionsAssigned() == 1;
                    })) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                return false;
            }
        }, 60000L, "Failed to assign topic partitions to tasks ");
        for (int i = 0; i < TEST_RECORD_COUNT.intValue(); i++) {
            this.connectCluster.kafka().produce("ConnectorSinkTopic", "key", "TracedSinkConnector-" + i);
        }
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorSinkTopic");
        connectorHandle.awaitRecords(30000L);
        connectorHandle.awaitCommits(30000L);
        this.connectCluster.deleteConnector("TracedSinkConnector");
        this.connectCluster.assertions().assertConnectorAndTasksAreStopped("TracedSinkConnector", "Failed to stop connector");
        this.connectCluster.assertions().assertTopicsExist("ConnectorTraceTopic");
        this.connectCluster.kafka().consume(TEST_RECORD_COUNT.intValue(), 30000L, "ConnectorTraceTopic");
    }
}
