package org.apache.kafka.connect.integration;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.class */
public class SinkConnectorsIntegrationTest {
    private static final int NUM_TASKS = 1;
    private static final int NUM_WORKERS = 1;
    private static final String CONNECTOR_NAME = "connect-integration-test-sink";
    private static final long TASK_CONSUME_TIMEOUT_MS = 10000;
    private EmbeddedConnectCluster connect;

    @Before
    public void setup() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.client.config.override.policy", "All");
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        properties.put("delete.topic.enable", "true");
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(1).workerProps(hashMap).brokerProps(properties).build();
        this.connect.start();
        this.connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
    }

    @After
    public void close() {
        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
        this.connect.stop();
    }

    @Test
    public void testEagerConsumerPartitionAssignment() throws Exception {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic2", 0);
        TopicPartition topicPartition3 = new TopicPartition("topic3", 0);
        Map<String, String> baseSinkConnectorProps = baseSinkConnectorProps(String.join(",", Arrays.asList("topic1", "topic2", "topic3")));
        baseSinkConnectorProps.put("consumer.override.partition.assignment.strategy", RoundRobinAssignor.class.getName());
        baseSinkConnectorProps.put("consumer.override.default.api.timeout.ms", "5000");
        HashSet hashSet = new HashSet();
        TaskHandle taskHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME).taskHandle("connect-integration-test-sink-0", sinkRecord -> {
            Assert.assertTrue("Task received duplicate record from Connect", hashSet.add(Objects.toString(sinkRecord.value())));
        });
        this.connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorProps);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
        Assert.assertEquals(0L, taskHandle.numPartitionsAssigned());
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        this.connect.kafka().createTopic("topic1", 1);
        hashSet3.add(topicPartition);
        this.connect.kafka().produce("topic1", "t1v1");
        hashSet2.add("t1v1");
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet);
        }, TASK_CONSUME_TIMEOUT_MS, "Task did not receive records in time");
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(hashSet3, taskHandle.assignment());
        this.connect.kafka().createTopic("topic2", 1);
        hashSet3.add(topicPartition2);
        this.connect.kafka().produce("topic2", "t2v1");
        hashSet2.add("t2v1");
        this.connect.kafka().produce("topic2", "t1v2");
        hashSet2.add("t1v2");
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet);
        }, TASK_CONSUME_TIMEOUT_MS, "Task did not receive records in time");
        Assert.assertEquals(2L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesCommitted(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition2));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition2));
        Assert.assertEquals(hashSet3, taskHandle.assignment());
        this.connect.kafka().createTopic("topic3", 1);
        hashSet3.add(topicPartition3);
        this.connect.kafka().produce("topic3", "t3v1");
        hashSet2.add("t3v1");
        this.connect.kafka().produce("topic2", "t2v2");
        hashSet2.add("t2v2");
        this.connect.kafka().produce("topic2", "t1v3");
        hashSet2.add("t1v3");
        hashSet3.add(topicPartition3);
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet);
        }, TASK_CONSUME_TIMEOUT_MS, "Task did not receive records in time");
        Assert.assertEquals(3L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(2L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(2L, taskHandle.timesCommitted(topicPartition));
        Assert.assertEquals(2L, taskHandle.timesAssigned(topicPartition2));
        Assert.assertEquals(1L, taskHandle.timesRevoked(topicPartition2));
        Assert.assertEquals(1L, taskHandle.timesCommitted(topicPartition2));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition3));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition3));
        Assert.assertEquals(hashSet3, taskHandle.assignment());
        this.connect.kafka().deleteTopic("topic1");
        hashSet3.remove(topicPartition);
        this.connect.kafka().produce("topic3", "t3v2");
        hashSet2.add("t3v2");
        this.connect.kafka().produce("topic2", "t2v3");
        hashSet2.add("t2v3");
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet) && hashSet3.equals(taskHandle.assignment());
        }, TASK_CONSUME_TIMEOUT_MS, "Timed out while waiting for task to receive records and updated topic partition assignment");
        Assert.assertEquals(3L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(3L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(3L, taskHandle.timesCommitted(topicPartition));
        Assert.assertEquals(3L, taskHandle.timesAssigned(topicPartition2));
        Assert.assertEquals(2L, taskHandle.timesRevoked(topicPartition2));
        Assert.assertEquals(2L, taskHandle.timesCommitted(topicPartition2));
        Assert.assertEquals(2L, taskHandle.timesAssigned(topicPartition3));
        Assert.assertEquals(1L, taskHandle.timesRevoked(topicPartition3));
        Assert.assertEquals(1L, taskHandle.timesCommitted(topicPartition3));
    }

    @Test
    public void testCooperativeConsumerPartitionAssignment() throws Exception {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic2", 0);
        TopicPartition topicPartition3 = new TopicPartition("topic3", 0);
        Map<String, String> baseSinkConnectorProps = baseSinkConnectorProps(String.join(",", Arrays.asList("topic1", "topic2", "topic3")));
        baseSinkConnectorProps.put("consumer.override.partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
        baseSinkConnectorProps.put("consumer.override.default.api.timeout.ms", "5000");
        HashSet hashSet = new HashSet();
        TaskHandle taskHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME).taskHandle("connect-integration-test-sink-0", sinkRecord -> {
            Assert.assertTrue("Task received duplicate record from Connect", hashSet.add(Objects.toString(sinkRecord.value())));
        });
        this.connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorProps);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
        Assert.assertEquals(0L, taskHandle.numPartitionsAssigned());
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        this.connect.kafka().createTopic("topic1", 1);
        hashSet3.add(topicPartition);
        this.connect.kafka().produce("topic1", "t1v1");
        hashSet2.add("t1v1");
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet);
        }, TASK_CONSUME_TIMEOUT_MS, "Task did not receive records in time");
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(hashSet3, taskHandle.assignment());
        this.connect.kafka().createTopic("topic2", 1);
        hashSet3.add(topicPartition2);
        this.connect.kafka().produce("topic2", "t2v1");
        hashSet2.add("t2v1");
        this.connect.kafka().produce("topic2", "t1v2");
        hashSet2.add("t1v2");
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet);
        }, TASK_CONSUME_TIMEOUT_MS, "Task did not receive records in time");
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(0L, taskHandle.timesCommitted(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition2));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition2));
        Assert.assertEquals(hashSet3, taskHandle.assignment());
        this.connect.kafka().createTopic("topic3", 1);
        hashSet3.add(topicPartition3);
        this.connect.kafka().produce("topic3", "t3v1");
        hashSet2.add("t3v1");
        this.connect.kafka().produce("topic2", "t2v2");
        hashSet2.add("t2v2");
        this.connect.kafka().produce("topic2", "t1v3");
        hashSet2.add("t1v3");
        hashSet3.add(topicPartition3);
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet);
        }, TASK_CONSUME_TIMEOUT_MS, "Task did not receive records in time");
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(0L, taskHandle.timesCommitted(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition2));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition2));
        Assert.assertEquals(0L, taskHandle.timesCommitted(topicPartition2));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition3));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition3));
        Assert.assertEquals(hashSet3, taskHandle.assignment());
        this.connect.kafka().deleteTopic("topic1");
        hashSet3.remove(topicPartition);
        this.connect.kafka().produce("topic3", "t3v2");
        hashSet2.add("t3v2");
        this.connect.kafka().produce("topic2", "t2v3");
        hashSet2.add("t2v3");
        TestUtils.waitForCondition(() -> {
            return hashSet2.equals(hashSet) && hashSet3.equals(taskHandle.assignment());
        }, TASK_CONSUME_TIMEOUT_MS, "Timed out while waiting for task to receive records and updated topic partition assignment");
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesRevoked(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesCommitted(topicPartition));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition2));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition2));
        Assert.assertEquals(0L, taskHandle.timesCommitted(topicPartition2));
        Assert.assertEquals(1L, taskHandle.timesAssigned(topicPartition3));
        Assert.assertEquals(0L, taskHandle.timesRevoked(topicPartition3));
        Assert.assertEquals(0L, taskHandle.timesCommitted(topicPartition3));
    }

    private Map<String, String> baseSinkConnectorProps(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", str);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }
}
