package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.class */
public class ConnectorRestartApiIntegrationTest {
    private static final int ONE_WORKER = 1;
    private static final int NUM_TASKS = 4;
    private static final int MESSAGES_PER_POLL = 10;
    private static final String CONNECTOR_NAME_PREFIX = "conn-";
    private static final String TOPIC_NAME = "test-topic";
    private EmbeddedConnectCluster connect;
    private ConnectorHandle connectorHandle;
    private String connectorName;

    @Rule
    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);

    @Rule
    public TestName testName = new TestName();
    private static final Logger log = LoggerFactory.getLogger(ConnectorRestartApiIntegrationTest.class);
    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
    private static Map<Integer, EmbeddedConnectCluster> connectClusterMap = new ConcurrentHashMap();

    @Before
    public void setup() {
        this.connectorName = CONNECTOR_NAME_PREFIX + this.testName.getMethodName();
        this.connectorHandle = RuntimeHandles.get().connectorHandle(this.connectorName);
    }

    private void startOrReuseConnectWithNumWorkers(int i) throws Exception {
        this.connect = connectClusterMap.computeIfAbsent(Integer.valueOf(i), num -> {
            HashMap hashMap = new HashMap();
            hashMap.put("offset.flush.interval.ms", String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
            hashMap.put("connector.client.config.override.policy", "All");
            Properties properties = new Properties();
            properties.put("auto.create.topics.enable", String.valueOf(false));
            EmbeddedConnectCluster build = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(i).workerProps(hashMap).brokerProps(properties).maskExitProcedures(true).build();
            build.start();
            return build;
        });
        this.connect.assertions().assertExactlyNumWorkersAreUp(i, "Initial group of workers did not start in time.");
    }

    @After
    public void tearDown() {
        RuntimeHandles.get().deleteConnector(this.connectorName);
    }

    @AfterClass
    public static void close() {
        connectClusterMap.values().forEach(embeddedConnectCluster -> {
            embeddedConnectCluster.stop();
        });
    }

    @Test
    public void testRestartUnknownConnectorNoParams() throws Exception {
        startOrReuseConnectWithNumWorkers(1);
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), this.connect.requestPost(this.connect.endpointForResource(String.format("connectors/%s/restart", "Unknown")), "", Collections.emptyMap()).getStatus());
    }

    @Test
    public void testRestartUnknownConnector() throws Exception {
        restartUnknownConnector(false, false);
        restartUnknownConnector(false, true);
        restartUnknownConnector(true, false);
        restartUnknownConnector(true, true);
    }

    private void restartUnknownConnector(boolean z, boolean z2) throws Exception {
        startOrReuseConnectWithNumWorkers(1);
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), this.connect.requestPost(this.connect.endpointForResource(String.format("connectors/%s/restart?onlyFailed=" + z + "&includeTasks=" + z2, "Unknown")), "", Collections.emptyMap()).getStatus());
    }

    @Test
    public void testRunningConnectorAndTasksRestartOnlyConnector() throws Exception {
        runningConnectorAndTasksRestart(false, false, 1, allTasksExpectedRestarts(0), false);
    }

    @Test
    public void testRunningConnectorAndTasksRestartBothConnectorAndTasks() throws Exception {
        runningConnectorAndTasksRestart(false, true, 1, allTasksExpectedRestarts(1), false);
    }

    @Test
    public void testRunningConnectorAndTasksRestartOnlyFailedConnectorNoop() throws Exception {
        runningConnectorAndTasksRestart(true, false, 0, allTasksExpectedRestarts(0), true);
    }

    @Test
    public void testRunningConnectorAndTasksRestartBothConnectorAndTasksNoop() throws Exception {
        runningConnectorAndTasksRestart(true, true, 0, allTasksExpectedRestarts(0), true);
    }

    @Test
    public void testFailedConnectorRestartOnlyConnector() throws Exception {
        failedConnectorRestart(false, false, 1);
    }

    @Test
    public void testFailedConnectorRestartBothConnectorAndTasks() throws Exception {
        failedConnectorRestart(false, true, 1);
    }

    @Test
    public void testFailedConnectorRestartOnlyFailedConnectorAndTasks() throws Exception {
        failedConnectorRestart(true, true, 1);
    }

    @Test
    public void testFailedTasksRestartOnlyConnector() throws Exception {
        failedTasksRestart(false, false, 1, allTasksExpectedRestarts(0), buildAllTasksToFail(), false);
    }

    @Test
    public void testFailedTasksRestartOnlyTasks() throws Exception {
        failedTasksRestart(true, true, 0, allTasksExpectedRestarts(1), buildAllTasksToFail(), false);
    }

    @Test
    public void testFailedTasksRestartWithoutIncludeTasksNoop() throws Exception {
        failedTasksRestart(true, false, 0, allTasksExpectedRestarts(0), buildAllTasksToFail(), true);
    }

    @Test
    public void testFailedTasksRestartBothConnectorAndTasks() throws Exception {
        failedTasksRestart(false, true, 1, allTasksExpectedRestarts(1), buildAllTasksToFail(), false);
    }

    @Test
    public void testOneFailedTasksRestartOnlyOneTasks() throws Exception {
        Set<String> singleton = Collections.singleton(taskId(1));
        failedTasksRestart(true, true, 0, buildExpectedTasksRestarts(singleton), singleton, false);
    }

    @Test
    public void testMultiWorkerRestartOnlyConnector() throws Exception {
        runningConnectorAndTasksRestart(false, false, 1, allTasksExpectedRestarts(0), false, 6);
    }

    @Test
    public void testMultiWorkerRestartBothConnectorAndTasks() throws Exception {
        runningConnectorAndTasksRestart(false, true, 1, allTasksExpectedRestarts(1), false, 6);
    }

    private void runningConnectorAndTasksRestart(boolean z, boolean z2, int i, Map<String, Integer> map, boolean z3) throws Exception {
        runningConnectorAndTasksRestart(z, z2, i, map, z3, 1);
    }

    private void runningConnectorAndTasksRestart(boolean z, boolean z2, int i, Map<String, Integer> map, boolean z3, int i2) throws Exception {
        startOrReuseConnectWithNumWorkers(i2);
        this.connect.configureConnector(this.connectorName, defaultSourceConnectorProps(TOPIC_NAME));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(this.connectorName, NUM_TASKS, "Connector tasks are not all in running state.");
        StartsAndStops countsSnapshot = this.connectorHandle.startAndStopCounter().countsSnapshot();
        Map map2 = (Map) this.connectorHandle.tasks().stream().collect(Collectors.toMap((v0) -> {
            return v0.taskId();
        }, taskHandle -> {
            return taskHandle.startAndStopCounter().countsSnapshot();
        }));
        StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(i, map, z2);
        StartAndStopLatch expectedStarts = this.connectorHandle.expectedStarts(i, map, z2);
        ConnectorStateInfo restartConnectorAndTasks = i2 == 1 ? this.connect.restartConnectorAndTasks(this.connectorName, z, z2, false) : this.connect.restartConnectorAndTasks(this.connectorName, z, z2, true);
        if (z3) {
            assertNoRestartingState(restartConnectorAndTasks);
        }
        Assert.assertTrue("Failed to stop connector and tasks within " + EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS + "ms", expectedStops.await(EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(this.connectorName, NUM_TASKS, "Connector tasks are not all in running state.");
        Assert.assertTrue("Failed to start connector and tasks within " + EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS + "ms", expectedStarts.await(EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
        StartsAndStops countsSnapshot2 = this.connectorHandle.startAndStopCounter().countsSnapshot();
        Assert.assertEquals(countsSnapshot.starts() + i, countsSnapshot2.starts());
        Assert.assertEquals(countsSnapshot.stops() + i, countsSnapshot2.stops());
        this.connectorHandle.tasks().forEach(taskHandle2 -> {
            StartsAndStops countsSnapshot3 = taskHandle2.startAndStopCounter().countsSnapshot();
            if (i2 == 1) {
                Assert.assertEquals(((StartsAndStops) map2.get(taskHandle2.taskId())).starts() + ((Integer) map.get(taskHandle2.taskId())).intValue(), countsSnapshot3.starts());
                Assert.assertEquals(((StartsAndStops) map2.get(taskHandle2.taskId())).stops() + ((Integer) map.get(taskHandle2.taskId())).intValue(), countsSnapshot3.stops());
            } else {
                Assert.assertTrue(countsSnapshot3.starts() >= ((StartsAndStops) map2.get(taskHandle2.taskId())).starts() + ((Integer) map.get(taskHandle2.taskId())).intValue());
                Assert.assertTrue(countsSnapshot3.stops() >= ((StartsAndStops) map2.get(taskHandle2.taskId())).stops() + ((Integer) map.get(taskHandle2.taskId())).intValue());
            }
        });
    }

    private void failedConnectorRestart(boolean z, boolean z2, int i) throws Exception {
        Map<String, Integer> allTasksExpectedRestarts = allTasksExpectedRestarts(0);
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        defaultSourceConnectorProps.put("connector.start.inject.error", "true");
        startOrReuseConnectWithNumWorkers(1);
        this.connect.configureConnector(this.connectorName, defaultSourceConnectorProps);
        this.connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(this.connectorName, 0, "Connector or tasks are in running state.");
        StartsAndStops countsSnapshot = this.connectorHandle.startAndStopCounter().countsSnapshot();
        StartAndStopLatch expectedStarts = this.connectorHandle.expectedStarts(i, allTasksExpectedRestarts, z2);
        this.connect.restartConnectorAndTasks(this.connectorName, z, z2, false);
        this.connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(this.connectorName, 0, "Connector tasks are not all in running state.");
        Assert.assertTrue("Failed to start connector and tasks after coordinator failure within " + EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS + "ms", expectedStarts.await(EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
        Assert.assertEquals(countsSnapshot.starts() + i, this.connectorHandle.startAndStopCounter().countsSnapshot().starts());
    }

    private void failedTasksRestart(boolean z, boolean z2, int i, Map<String, Integer> map, Set<String> set, boolean z3) throws Exception {
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps(TOPIC_NAME);
        set.forEach(str -> {
        });
        startOrReuseConnectWithNumWorkers(1);
        this.connect.configureConnector(this.connectorName, defaultSourceConnectorProps);
        this.connect.assertions().assertConnectorIsRunningAndNumTasksHaveFailed(this.connectorName, NUM_TASKS, set.size(), "Connector tasks are in running state.");
        StartsAndStops countsSnapshot = this.connectorHandle.startAndStopCounter().countsSnapshot();
        Map map2 = (Map) this.connectorHandle.tasks().stream().collect(Collectors.toMap((v0) -> {
            return v0.taskId();
        }, taskHandle -> {
            return taskHandle.startAndStopCounter().countsSnapshot();
        }));
        StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(i, map, z2);
        StartAndStopLatch expectedStarts = this.connectorHandle.expectedStarts(i, map, z2);
        ConnectorStateInfo restartConnectorAndTasks = this.connect.restartConnectorAndTasks(this.connectorName, z, z2, false);
        if (z3) {
            assertNoRestartingState(restartConnectorAndTasks);
        }
        Assert.assertTrue("Failed to stop connector and tasks within " + EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS + "ms", expectedStops.await(EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
        this.connect.assertions().assertConnectorIsRunningAndNumTasksHaveFailed(this.connectorName, NUM_TASKS, set.size(), "Connector tasks are not all in running state.");
        Assert.assertTrue("Failed to start connector and tasks within " + EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS + "ms", expectedStarts.await(EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
        StartsAndStops countsSnapshot2 = this.connectorHandle.startAndStopCounter().countsSnapshot();
        Assert.assertEquals(countsSnapshot.starts() + i, countsSnapshot2.starts());
        Assert.assertEquals(countsSnapshot.stops() + i, countsSnapshot2.stops());
        this.connectorHandle.tasks().forEach(taskHandle2 -> {
            StartsAndStops countsSnapshot3 = taskHandle2.startAndStopCounter().countsSnapshot();
            Assert.assertEquals(((StartsAndStops) map2.get(taskHandle2.taskId())).starts() + ((Integer) map.get(taskHandle2.taskId())).intValue(), countsSnapshot3.starts());
            Assert.assertEquals(((StartsAndStops) map2.get(taskHandle2.taskId())).stops() + ((Integer) map.get(taskHandle2.taskId())).intValue(), countsSnapshot3.stops());
        });
    }

    private void assertNoRestartingState(ConnectorStateInfo connectorStateInfo) {
        Assert.assertNotEquals(AbstractStatus.State.RESTARTING.name(), connectorStateInfo.connector().state());
        connectorStateInfo.tasks().forEach(taskState -> {
            Assert.assertNotEquals(AbstractStatus.State.RESTARTING.name(), taskState.state());
        });
    }

    private Set<String> buildAllTasksToFail() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < NUM_TASKS; i++) {
            hashSet.add(taskId(i));
        }
        return hashSet;
    }

    private Map<String, Integer> allTasksExpectedRestarts(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < NUM_TASKS; i2++) {
            hashMap.put(taskId(i2), Integer.valueOf(i));
        }
        return hashMap;
    }

    private Map<String, Integer> buildExpectedTasksRestarts(Set<String> set) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < NUM_TASKS; i++) {
            String taskId = taskId(i);
            hashMap.put(taskId, Integer.valueOf(set.contains(taskId) ? 1 : 0));
        }
        return hashMap;
    }

    private String taskId(int i) {
        return this.connectorName + "-" + i;
    }

    private Map<String, String> defaultSourceConnectorProps(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, str);
        hashMap.put("throughput", "10");
        hashMap.put("messages.per.poll", String.valueOf(MESSAGES_PER_POLL));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        return hashMap;
    }
}
