package org.apache.kafka.connect.integration;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.class */
public class ConnectWorkerIntegrationTest {
    private static final int NUM_TOPIC_PARTITIONS = 3;
    private static final int NUM_WORKERS = 3;
    private static final String CONNECTOR_NAME = "simple-source";
    private EmbeddedConnectCluster.Builder connectBuilder;
    private EmbeddedConnectCluster connect;
    Map<String, String> workerProps = new HashMap();
    Properties brokerProps = new Properties();
    private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);

    @Before
    public void setup() throws IOException {
        this.workerProps.put("offset.flush.interval.ms", String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
        this.workerProps.put("connector.client.config.override.policy", "All");
        this.brokerProps.put("auto.create.topics.enable", String.valueOf(false));
        this.connectBuilder = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(3).workerProps(this.workerProps).brokerProps(this.brokerProps).maskExitProcedures(true);
    }

    @After
    public void close() {
        this.connect.stop();
    }

    @Test
    public void testAddAndRemoveWorker() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        int i = 4;
        this.connect.kafka().createTopic("test-topic", 3);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(4));
        hashMap.put("throughput", String.valueOf(1));
        hashMap.put("messages.per.poll", String.valueOf(10));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(3).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(CONNECTOR_NAME, i).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
        WorkerHandle addWorker = this.connect.addWorker();
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(4).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Expanded group of workers did not start in time.");
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(CONNECTOR_NAME, i).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
        Assert.assertTrue(this.connect.activeWorkers().contains(addWorker));
        this.connect.removeWorker(addWorker);
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(3).orElse(false).booleanValue() && !assertWorkersUp(4).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Group of workers did not shrink in time.");
        Assert.assertFalse(this.connect.activeWorkers().contains(addWorker));
    }

    @Test
    public void testRestartFailedTask() throws Exception {
        this.connect = this.connectBuilder.build();
        this.connect.start();
        int i = 1;
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", Objects.toString(1));
        hashMap.put("producer.override.bootstrap.servers", "nobrokerrunningatthisaddress");
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(3).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        TestUtils.waitForCondition(() -> {
            return assertConnectorTasksFailed(CONNECTOR_NAME, i).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not fail in time");
        hashMap.remove("producer.override.bootstrap.servers");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connect.executePost(this.connect.endpointForResource(String.format("connectors/%s/tasks/0/restart", CONNECTOR_NAME)), "", Collections.emptyMap());
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(CONNECTOR_NAME, i).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
    }

    @Test
    public void testBrokerCoordinator() throws Exception {
        this.workerProps.put("scheduled.rebalance.max.delay.ms", String.valueOf(5000));
        this.connect = this.connectBuilder.workerProps(this.workerProps).build();
        this.connect.start();
        int i = 4;
        this.connect.kafka().createTopic("test-topic", 3);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(4));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("throughput", String.valueOf(1));
        hashMap.put("messages.per.poll", String.valueOf(10));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(3).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(CONNECTOR_NAME, i).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
        this.connect.kafka().stopOnlyKafka();
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(3).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Group of workers did not remain the same after broker shutdown");
        Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
        this.connect.kafka().startOnlyKafkaOnSamePorts();
        Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
        TestUtils.waitForCondition(() -> {
            return assertWorkersUp(3).orElse(false).booleanValue();
        }, WORKER_SETUP_DURATION_MS, "Group of workers did not remain the same within the designated time.");
        Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(CONNECTOR_NAME, i).orElse(false).booleanValue();
        }, CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
    }

    private Optional<Boolean> assertWorkersUp(int i) {
        try {
            return Optional.of(Boolean.valueOf(this.connect.activeWorkers().size() >= i));
        } catch (Exception e) {
            log.error("Could not check active workers.", e);
            return Optional.empty();
        }
    }

    private Optional<Boolean> assertConnectorAndTasksRunning(String str, int i) {
        return assertConnectorState(str, AbstractStatus.State.RUNNING, i, AbstractStatus.State.RUNNING);
    }

    private Optional<Boolean> assertConnectorTasksFailed(String str, int i) {
        return assertConnectorState(str, AbstractStatus.State.RUNNING, i, AbstractStatus.State.FAILED);
    }

    private Optional<Boolean> assertConnectorState(String str, AbstractStatus.State state, int i, AbstractStatus.State state2) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            return Optional.of(Boolean.valueOf(connectorStatus != null && connectorStatus.connector().state().equals(state.toString()) && connectorStatus.tasks().size() == i && connectorStatus.tasks().stream().allMatch(taskState -> {
                return taskState.state().equals(state2.toString());
            })));
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return Optional.empty();
        }
    }
}
