package org.apache.kafka.connect.runtime.distributed;

import com.fasterxml.jackson.core.type.TypeReference;
import io.confluent.logevents.connect.LogEventsEmitter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.crypto.SecretKey;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.CloseableConnectorContext;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.RestartPlan;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigDecorator;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

@PrepareForTest({DistributedHerder.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.class */
public class DistributedHerderTest {
    private static final Map<String, String> HERDER_CONFIG = new HashMap();
    private static final String MEMBER_URL = "memberUrl";
    private static final String CONN1 = "sourceA";
    private static final String CONN2 = "sourceB";
    private static final ConnectorTaskId TASK0;
    private static final ConnectorTaskId TASK1;
    private static final ConnectorTaskId TASK2;
    private static final Integer MAX_TASKS;
    private static final Map<String, String> CONN1_CONFIG;
    private static final String FOO_TOPIC = "foo";
    private static final String BAR_TOPIC = "bar";
    private static final String BAZ_TOPIC = "baz";
    private static final Map<String, String> CONN1_CONFIG_UPDATED;
    private static final ConfigInfos CONN1_CONFIG_INFOS;
    private static final Map<String, String> CONN2_CONFIG;
    private static final ConfigInfos CONN2_CONFIG_INFOS;
    private static final ConfigInfos CONN2_INVALID_CONFIG_INFOS;
    private static final Map<String, String> TASK_CONFIG;
    private static final List<Map<String, String>> TASK_CONFIGS;
    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP;
    private static final ClusterConfigState SNAPSHOT;
    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1;
    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG;
    private static final String WORKER_ID = "localhost:8083";
    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
    private static final Runnable EMPTY_RUNNABLE;

    @Mock
    private ConfigBackingStore configBackingStore;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private WorkerGroupMember member;
    private MockTime time;
    private DistributedHerder herder;
    private MockConnectMetrics metrics;

    @Mock
    private Worker worker;

    @Mock
    private WorkerConfigTransformer transformer;

    @Mock
    private WorkerConfigDecorator decorator;

    @Mock
    private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;

    @Mock
    private Plugins plugins;

    @Mock
    private RestClient restClient;
    private ConfigBackingStore.UpdateListener configUpdateListener;
    private WorkerRebalanceListener rebalanceListener;
    private ExecutorService herderExecutor;
    private Future<?> herderFuture;
    private SinkConnectorConfig conn1SinkConfig;
    private SinkConnectorConfig conn1SinkConfigUpdated;
    private short connectProtocolVersion;
    private CountDownLatch shutdownCalled = new CountDownLatch(1);
    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceConnector.class */
    private abstract class BogusSourceConnector extends SourceConnector {
        private BogusSourceConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceTask.class */
    private abstract class BogusSourceTask extends SourceTask {
        private BogusSourceTask() {
        }
    }

    @Before
    public void setUp() throws Exception {
        this.time = new MockTime();
        this.metrics = new MockConnectMetrics(this.time);
        this.worker = (Worker) PowerMock.createMock(Worker.class);
        EasyMock.expect(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).andStubReturn(Boolean.TRUE);
        AutoCloseable autoCloseable = () -> {
            this.shutdownCalled.countDown();
        };
        this.connectProtocolVersion = (short) 0;
        this.herder = (DistributedHerder) PowerMock.createPartialMock(DistributedHerder.class, new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", "recordRestarting"}, new Object[]{new DistributedConfig(HERDER_CONFIG), this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, this.configBackingStore, this.member, MEMBER_URL, this.restClient, this.metrics, this.time, this.noneConnectorClientConfigOverridePolicy, new AutoCloseable[]{autoCloseable}});
        DistributedHerder distributedHerder = this.herder;
        distributedHerder.getClass();
        this.configUpdateListener = new DistributedHerder.ConfigUpdateListener(distributedHerder);
        DistributedHerder distributedHerder2 = this.herder;
        distributedHerder2.getClass();
        this.rebalanceListener = new DistributedHerder.RebalanceListener(distributedHerder2, this.time);
        this.plugins = (Plugins) PowerMock.createMock(Plugins.class);
        this.conn1SinkConfig = new SinkConnectorConfig(this.plugins, CONN1_CONFIG);
        this.conn1SinkConfigUpdated = new SinkConnectorConfig(this.plugins, CONN1_CONFIG_UPDATED);
        EasyMock.expect(this.herder.connectorType((Map) EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes();
        PowerMock.expectPrivate(this.herder, "updateDeletedConnectorStatus", new Object[0]).andVoid().anyTimes();
        PowerMock.expectPrivate(this.herder, "updateDeletedTaskStatus", new Object[0]).andVoid().anyTimes();
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
        if (this.herderExecutor != null) {
            this.herderExecutor.shutdownNow();
            this.herderExecutor = null;
        }
    }

    @Test
    public void testJoinAssignment() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testRebalance() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance((Collection<String>) Arrays.asList(CONN1), Arrays.asList(TASK1), (short) 0, 1L, Arrays.asList(CONN1), Arrays.asList(new ConnectorTaskId[0]));
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        this.time.sleep(3000L);
        assertStatistics(3, 2, 100.0d, 3000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testIncrementalCooperativeRebalanceForNewMember() throws Exception {
        this.connectProtocolVersion = (short) 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 1);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        this.time.sleep(3000L);
        assertStatistics(3, 2, 100.0d, 3000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testIncrementalCooperativeRebalanceForExistingMember() {
        this.connectProtocolVersion = (short) 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 1);
        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), (short) 0, 1L, Collections.emptyList(), Collections.emptyList(), 0);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.configState = SNAPSHOT;
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        this.time.sleep(3000L);
        assertStatistics(3, 2, 100.0d, 3000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testIncrementalCooperativeRebalanceWithDelay() throws Exception {
        this.connectProtocolVersion = (short) 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 1);
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.emptyList(), Arrays.asList(TASK2), 10000);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK2), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall().andAnswer(() -> {
            this.time.sleep(9900L);
            return null;
        });
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 2, 100.0d, 2000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testRebalanceFailedConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance((Collection<String>) Arrays.asList(CONN1), Arrays.asList(TASK1), (short) 0, 1L, Arrays.asList(CONN1), Arrays.asList(new ConnectorTaskId[0]));
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(false, null, null);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 2, 100.0d, 2000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testRevoke() throws TimeoutException {
        revokeAndReassign(false);
    }

    @Test
    public void testIncompleteRebalanceBeforeRevoke() throws TimeoutException {
        revokeAndReassign(true);
    }

    public void revokeAndReassign(boolean z) throws TimeoutException {
        this.connectProtocolVersion = (short) 1;
        int i = 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn(Short.valueOf(this.connectProtocolVersion));
        expectRebalance(1, new ArrayList(Collections.singletonList(CONN1)), new ArrayList(Collections.singletonList(TASK1)));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        if (z) {
            i = 1 + 1;
            expectRebalance(i, Arrays.asList(new String[0]), Arrays.asList(new ConnectorTaskId[0]));
            expectConfigRefreshAndSnapshot(SNAPSHOT);
            this.member.requestRejoin();
            PowerMock.expectLastCall();
        }
        expectRebalance((Collection<String>) Arrays.asList(CONN1), Arrays.asList(new ConnectorTaskId[0]), (short) 0, i, Arrays.asList(new String[0]), Arrays.asList(new ConnectorTaskId[0]));
        if (z) {
            expectConfigRefreshAndSnapshot(new ClusterConfigState(i, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()));
        }
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(i, Arrays.asList(CONN1), Arrays.asList(new ConnectorTaskId[0]));
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        if (z) {
            this.herder.tick();
        }
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testHaltCleansUpWorker() {
        EasyMock.expect(this.worker.connectorNames()).andReturn(Collections.singleton(CONN1));
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.taskIds()).andReturn(Collections.singleton(TASK1));
        this.worker.stopAndAwaitTask(TASK1);
        PowerMock.expectLastCall();
        this.member.stop();
        PowerMock.expectLastCall();
        this.configBackingStore.stop();
        PowerMock.expectLastCall();
        this.statusBackingStore.stop();
        PowerMock.expectLastCall();
        this.worker.stop();
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.halt();
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.herder.validateConnectorConfig((Map) EasyMock.eq(CONN2_CONFIG), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, CONN2_CONFIG_INFOS);
            return null;
        });
        this.configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) null, new Herder.Created(true, new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList(), ConnectorType.SOURCE)));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorConfigBackingStoreError() {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.herder.validateConnectorConfig((Map) EasyMock.eq(CONN2_CONFIG), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, CONN2_CONFIG_INFOS);
            return null;
        });
        this.configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
        PowerMock.expectLastCall().andThrow(new ConnectException("Error writing connector configuration to Kafka"));
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.anyObject(ConnectException.class), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorFailedValidation() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.remove("name");
        this.member.wakeup();
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.herder.validateConnectorConfig((Map) EasyMock.eq(hashMap), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, CONN2_INVALID_CONFIG_INFOS);
            return null;
        });
        Capture newCapture2 = EasyMock.newCapture();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.capture(newCapture2), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, hashMap, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        Assert.assertTrue(newCapture2.hasCaptured());
        Assert.assertTrue(newCapture2.getValue() instanceof BadRequestException);
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorNameConflictsWithWorkerGroupId() {
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.put("name", "test-group");
        expectConfigDecoration();
        ConfigValue configValue = (ConfigValue) this.herder.validateSinkConnectorConfig(ConnectorConfig.configDef(), hashMap).get("name");
        Assert.assertNotNull(configValue.errorMessages());
        Assert.assertEquals(Collections.singletonList("Consumer group for sink connector named test-group conflicts with Connect worker group connect-test-group"), configValue.errorMessages());
    }

    @Test
    public void testExactlyOnceSourceSupportValidation() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.exactlyOnceSupport((Map) EasyMock.eq(hashMap))).andReturn(ExactlyOnceSupport.SUPPORTED);
        PowerMock.replayAll(new Object[]{sourceConnector});
        Assert.assertEquals(Collections.emptyList(), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages());
        PowerMock.verifyAll();
    }

    @Test
    public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.exactlyOnceSupport((Map) EasyMock.eq(hashMap))).andReturn(ExactlyOnceSupport.UNSUPPORTED);
        PowerMock.replayAll(new Object[]{sourceConnector});
        Assert.assertEquals(Collections.singletonList("The connector does not support exactly-once semantics with the provided configuration."), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages());
        PowerMock.verifyAll();
    }

    @Test
    public void testExactlyOnceSourceSupportValidationOnUnknownConnector() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.exactlyOnceSupport((Map) EasyMock.eq(hashMap))).andReturn((Object) null);
        PowerMock.replayAll(new Object[]{sourceConnector});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages();
        Assert.assertFalse(errorMessages.isEmpty());
        Assert.assertTrue("Error message did not contain expected text: " + ((String) errorMessages.get(0)), ((String) errorMessages.get(0)).contains("The connector does not implement the API required for preflight validation of exactly-once source support."));
        Assert.assertEquals(1L, errorMessages.size());
        PowerMock.verifyAll();
    }

    @Test
    public void testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.exactlyOnceSupport((Map) EasyMock.eq(hashMap))).andThrow(new NullPointerException("time to add a new unit test :)"));
        PowerMock.replayAll(new Object[]{sourceConnector});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages();
        Assert.assertFalse(errorMessages.isEmpty());
        Assert.assertTrue("Error message did not contain expected text: " + ((String) errorMessages.get(0)), ((String) errorMessages.get(0)).contains("time to add a new unit test :)"));
        Assert.assertEquals(1L, errorMessages.size());
        PowerMock.verifyAll();
    }

    @Test
    public void testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() {
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.exactlyOnceSupport((Map) EasyMock.eq(hashMap))).andReturn(ExactlyOnceSupport.SUPPORTED);
        PowerMock.replayAll(new Object[]{sourceConnector});
        Assert.assertEquals(Collections.singletonList("This worker does not have exactly-once source support enabled."), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages());
        PowerMock.verifyAll();
    }

    @Test
    public void testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", "invalid");
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        PowerMock.replayAll(new Object[]{sourceConnector});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages();
        Assert.assertFalse(errorMessages.isEmpty());
        Assert.assertTrue("Error message did not contain expected text: " + ((String) errorMessages.get(0)), ((String) errorMessages.get(0)).contains("String must be one of (case insensitive): "));
        Assert.assertEquals(1L, errorMessages.size());
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorTransactionBoundaryValidation() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.canDefineTransactionBoundaries((Map) EasyMock.eq(hashMap))).andReturn(ConnectorTransactionBoundaries.SUPPORTED);
        PowerMock.replayAll(new Object[]{sourceConnector});
        Assert.assertEquals(Collections.emptyList(), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages());
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorTransactionBoundaryValidationOnUnsupportedConnector() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.canDefineTransactionBoundaries((Map) EasyMock.eq(hashMap))).andReturn(ConnectorTransactionBoundaries.UNSUPPORTED);
        PowerMock.replayAll(new Object[]{sourceConnector});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages();
        Assert.assertFalse(errorMessages.isEmpty());
        Assert.assertTrue("Error message did not contain expected text: " + ((String) errorMessages.get(0)), ((String) errorMessages.get(0)).contains("The connector does not support connector-defined transaction boundaries with the given configuration."));
        Assert.assertEquals(1L, errorMessages.size());
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(sourceConnector.canDefineTransactionBoundaries((Map) EasyMock.eq(hashMap))).andThrow(new ConnectException("Wait I thought we tested for this?"));
        PowerMock.replayAll(new Object[]{sourceConnector});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages();
        Assert.assertFalse(errorMessages.isEmpty());
        Assert.assertTrue("Error message did not contain expected text: " + ((String) errorMessages.get(0)), ((String) errorMessages.get(0)).contains("Wait I thought we tested for this?"));
        Assert.assertEquals(1L, errorMessages.size());
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", "CONNECTOR.toString()");
        SourceConnector sourceConnector = (SourceConnector) PowerMock.createMock(SourceConnector.class);
        PowerMock.replayAll(new Object[]{sourceConnector});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages();
        Assert.assertFalse(errorMessages.isEmpty());
        Assert.assertTrue("Error message did not contain expected text: " + ((String) errorMessages.get(0)), ((String) errorMessages.get(0)).contains("String must be one of (case insensitive): "));
        Assert.assertEquals(1L, errorMessages.size());
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorAlreadyExists() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.herder.validateConnectorConfig((Map) EasyMock.eq(CONN1_CONFIG), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, CONN1_CONFIG_INFOS);
            return null;
        });
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testDestroyConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.configBackingStore.removeConnectorConfig(CONN1);
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) null, new Herder.Created(false, (Object) null));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        EasyMock.expect(this.statusBackingStore.getAllTopics((String) EasyMock.eq(CONN1))).andReturn(new HashSet(Arrays.asList(new TopicStatus(FOO_TOPIC, CONN1, 0, this.time.milliseconds()), new TopicStatus(BAR_TOPIC, CONN1, 0, this.time.milliseconds())))).times(2);
        this.statusBackingStore.deleteTopic((String) EasyMock.eq(CONN1), (String) EasyMock.eq(FOO_TOPIC));
        PowerMock.expectLastCall().times(2);
        this.statusBackingStore.deleteTopic((String) EasyMock.eq(CONN1), (String) EasyMock.eq(BAR_TOPIC));
        PowerMock.expectLastCall().times(2);
        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), (short) 0, 2L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        expectConfigRefreshAndSnapshot(ClusterConfigState.EMPTY);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.deleteConnectorConfig(CONN1, this.putConnectorCallback);
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics("leaderUrl", false, 3, 1, 100.0d, 1000.0d);
        this.configUpdateListener.onConnectorConfigRemove(CONN1);
        this.herder.configState = ClusterConfigState.EMPTY;
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics("leaderUrl", true, 3, 1, 100.0d, 2100.0d);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartUnknownConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN2, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotFoundException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotFoundException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorRedirectToLeader() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotLeaderException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorRedirectToOwner() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.ownerUrl(CONN1)).andReturn("ownerUrl");
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 3000.0d);
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotAssignedException);
            Assert.assertEquals("ownerUrl", e.getCause().forwardUrl());
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorAndTasksUnknownConnector() throws Exception {
        RestartRequest restartRequest = new RestartRequest("UnknownConnector", false, true);
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        Assert.assertTrue(executionException.getCause() instanceof NotFoundException);
        Assert.assertTrue(executionException.getMessage().contains("Unknown connector:"));
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorAndTasksNotLeader() throws Exception {
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof NotLeaderException);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorAndTasksUnknownStatus() throws Exception {
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.empty()).anyTimes();
        this.configBackingStore.putRestartRequest(restartRequest);
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        Assert.assertTrue(executionException.getCause() instanceof NotFoundException);
        Assert.assertTrue(executionException.getMessage().contains("Status for connector"));
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorAndTasksSuccess() throws Exception {
        RestartPlan restartPlan = (RestartPlan) PowerMock.createMock(RestartPlan.class);
        ConnectorStateInfo connectorStateInfo = (ConnectorStateInfo) PowerMock.createMock(ConnectorStateInfo.class);
        EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
        this.configBackingStore.putRestartRequest(restartRequest);
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        Assert.assertEquals(connectorStateInfo, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        PowerMock.verifyAll();
    }

    @Test
    public void testDoRestartConnectorAndTasksEmptyPlan() {
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.empty()).anyTimes();
        PowerMock.replayAll(new Object[0]);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        PowerMock.verifyAll();
    }

    @Test
    public void testDoRestartConnectorAndTasksNoAssignments() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN1, 0);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) PowerMock.createMock(RestartPlan.class);
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartConnector())).andReturn(true).anyTimes();
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartTasks())).andReturn(true).anyTimes();
        EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(connectorTaskId)).anyTimes();
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
        PowerMock.replayAll(new Object[0]);
        this.herder.assignment = ExtendedAssignment.empty();
        this.herder.doRestartConnectorAndTasks(restartRequest);
        PowerMock.verifyAll();
    }

    @Test
    public void testDoRestartConnectorAndTasksOnlyConnector() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN1, 0);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) PowerMock.createMock(RestartPlan.class);
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartConnector())).andReturn(true).anyTimes();
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartTasks())).andReturn(true).anyTimes();
        EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(connectorTaskId)).anyTimes();
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
        this.herder.assignment = (ExtendedAssignment) PowerMock.createMock(ExtendedAssignment.class);
        EasyMock.expect(this.herder.assignment.connectors()).andReturn(Collections.singletonList(CONN1)).anyTimes();
        EasyMock.expect(this.herder.assignment.tasks()).andReturn(Collections.emptyList()).anyTimes();
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.anyObject(TargetState.class), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.herder.onRestart(CONN1);
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        PowerMock.verifyAll();
    }

    @Test
    public void testDoRestartConnectorAndTasksOnlyTasks() {
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) PowerMock.createMock(RestartPlan.class);
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartConnector())).andReturn(true).anyTimes();
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartTasks())).andReturn(true).anyTimes();
        EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Arrays.asList(TASK0, TASK1, TASK2)).anyTimes();
        EasyMock.expect(Integer.valueOf(restartPlan.restartTaskCount())).andReturn(3).anyTimes();
        EasyMock.expect(Integer.valueOf(restartPlan.totalTaskCount())).andReturn(3).anyTimes();
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
        this.herder.assignment = (ExtendedAssignment) PowerMock.createMock(ExtendedAssignment.class);
        EasyMock.expect(this.herder.assignment.connectors()).andReturn(Collections.emptyList()).anyTimes();
        EasyMock.expect(this.herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes();
        this.herder.configState = SNAPSHOT;
        this.worker.stopAndAwaitTasks(Collections.singletonList(TASK0));
        PowerMock.expectLastCall();
        this.herder.onRestart(TASK0);
        EasyMock.expectLastCall();
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.anyObject(TargetState.class));
        PowerMock.expectLastCall().andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        PowerMock.verifyAll();
    }

    @Test
    public void testDoRestartConnectorAndTasksBoth() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN1, 0);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) PowerMock.createMock(RestartPlan.class);
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartConnector())).andReturn(true).anyTimes();
        EasyMock.expect(Boolean.valueOf(restartPlan.shouldRestartTasks())).andReturn(true).anyTimes();
        EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(connectorTaskId)).anyTimes();
        EasyMock.expect(Integer.valueOf(restartPlan.restartTaskCount())).andReturn(1).anyTimes();
        EasyMock.expect(Integer.valueOf(restartPlan.totalTaskCount())).andReturn(1).anyTimes();
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();
        this.herder.assignment = (ExtendedAssignment) PowerMock.createMock(ExtendedAssignment.class);
        EasyMock.expect(this.herder.assignment.connectors()).andReturn(Collections.singletonList(CONN1)).anyTimes();
        EasyMock.expect(this.herder.assignment.tasks()).andReturn(Collections.singletonList(connectorTaskId)).anyTimes();
        this.herder.configState = SNAPSHOT;
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.anyObject(TargetState.class), (Callback) EasyMock.capture(EasyMock.newCapture()));
        this.herder.onRestart(CONN1);
        EasyMock.expectLastCall();
        this.worker.stopAndAwaitTasks(Collections.singletonList(connectorTaskId));
        PowerMock.expectLastCall();
        this.herder.onRestart(connectorTaskId);
        EasyMock.expectLastCall();
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.anyObject(TargetState.class));
        PowerMock.expectLastCall().andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartTask() throws Exception {
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, this.conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.worker.stopAndAwaitTask(TASK0);
        PowerMock.expectLastCall();
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartUnknownTask() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.tick();
        this.herder.restartTask(new ConnectorTaskId("blah", 0), futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotFoundException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRequestProcessingOrder() {
        DistributedHerder.DistributedHerderRequest addRequest = this.herder.addRequest(100L, (Callable) null, (Callback) null);
        DistributedHerder.DistributedHerderRequest addRequest2 = this.herder.addRequest(10L, (Callable) null, (Callback) null);
        DistributedHerder.DistributedHerderRequest addRequest3 = this.herder.addRequest(200L, (Callable) null, (Callback) null);
        DistributedHerder.DistributedHerderRequest addRequest4 = this.herder.addRequest(200L, (Callable) null, (Callback) null);
        Assert.assertEquals(addRequest2, this.herder.requests.pollFirst());
        Assert.assertEquals(addRequest, this.herder.requests.pollFirst());
        Assert.assertEquals(addRequest3, this.herder.requests.pollFirst());
        Assert.assertEquals(addRequest4, this.herder.requests.pollFirst());
    }

    @Test
    public void testRestartTaskRedirectToLeader() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotLeaderException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartTaskRedirectToOwner() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.ownerUrl(TASK0)).andReturn("ownerUrl");
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected NotLeaderException to be raised");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotAssignedException);
            Assert.assertEquals("ownerUrl", e.getCause().forwardUrl());
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigAdded() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance((Collection<String>) Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Arrays.asList(CONN1), Collections.emptyList());
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigUpdate() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigUpdateFailedTransformation() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        WorkerConfigTransformer workerConfigTransformer = (WorkerConfigTransformer) EasyMock.mock(WorkerConfigTransformer.class);
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), workerConfigTransformer));
        EasyMock.expect(workerConfigTransformer.transform((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject())).andThrow(new ConfigException("Simulated exception thrown during config transformation"));
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        this.worker.logEventsDeduplicateErrors();
        PowerMock.expectLastCall().andStubReturn(false);
        this.worker.logEventsEmitter();
        PowerMock.expectLastCall().andStubReturn(EasyMock.mock(LogEventsEmitter.class));
        Capture newCapture2 = EasyMock.newCapture();
        this.statusBackingStore.putSafe((ConnectorStatus) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.replay(new Object[]{workerConfigTransformer});
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
        Assert.assertEquals(CONN1, ((ConnectorStatus) newCapture2.getValue()).id());
        Assert.assertEquals(AbstractStatus.State.FAILED, ((ConnectorStatus) newCapture2.getValue()).state());
    }

    @Test
    public void testConnectorPaused() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
        PowerMock.expectLastCall();
        this.worker.setTargetState((String) EasyMock.eq(CONN1), (TargetState) EasyMock.eq(TargetState.PAUSED), (Callback) EasyMock.capture(EasyMock.newCapture()));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return null;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorResumed() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1);
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.PAUSED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return true;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        PowerMock.expectLastCall();
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.setTargetState((String) EasyMock.eq(CONN1), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return null;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testUnknownConnectorPaused() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange("unknown-connector");
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorPausedRunningTaskOnly() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.emptySet());
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.worker.setTargetState((String) EasyMock.eq(CONN1), (TargetState) EasyMock.eq(TargetState.PAUSED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return null;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorResumedRunningTaskOnly() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.emptySet());
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1);
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.PAUSED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.worker.setTargetState((String) EasyMock.eq(CONN1), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return null;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(false, null, null);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testTaskConfigAdded() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance((Collection<String>) Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.emptyList(), Arrays.asList(TASK0));
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK0), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK0, TASK1, TASK2));
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testJoinLeaderCatchUpFails() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
        EasyMock.expectLastCall().andThrow(new TimeoutException());
        this.member.maybeLeaveGroup((String) EasyMock.eq("taking too long to read the log"));
        EasyMock.expectLastCall();
        this.member.requestRejoin();
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        long milliseconds = this.time.milliseconds();
        this.herder.tick();
        Assert.assertEquals(milliseconds + 100 + 300000, this.time.milliseconds());
        this.time.sleep(1000L);
        assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
        long milliseconds2 = this.time.milliseconds();
        this.herder.tick();
        Assert.assertEquals(milliseconds2 + 100, this.time.milliseconds());
        this.time.sleep(2000L);
        assertStatistics("leaderUrl", false, 3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Exception {
        this.connectProtocolVersion = (short) 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 1);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        this.member.requestRejoin();
        for (int i = 2; i >= 0; i--) {
            this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
            EasyMock.expectLastCall().andThrow(new TimeoutException());
            this.member.maybeLeaveGroup((String) EasyMock.eq("taking too long to read the log"));
            EasyMock.expectLastCall();
        }
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        int i2 = 100;
        for (int i3 = 5; i3 >= 5 - 2; i3--) {
            long milliseconds = this.time.milliseconds();
            int i4 = (DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10) / i3;
            this.herder.tick();
            Assert.assertEquals(milliseconds + i2 + i4, this.time.milliseconds());
            i2 = 0;
        }
        long milliseconds2 = this.time.milliseconds();
        this.herder.tick();
        Assert.assertEquals(milliseconds2 + 100, this.time.milliseconds());
        PowerMock.verifyAll();
    }

    @Test
    public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Exception {
        this.connectProtocolVersion = (short) 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 1);
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.worker.startSourceTask((ConnectorTaskId) EasyMock.eq(TASK1), (ClusterConfigState) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (Map) EasyMock.anyObject(), (TaskStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        PowerMock.expectLastCall().andReturn(true);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        this.member.requestRejoin();
        for (int i = 5; i >= 0; i--) {
            this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
            EasyMock.expectLastCall().andThrow(new TimeoutException());
            this.member.maybeLeaveGroup((String) EasyMock.eq("taking too long to read the log"));
            EasyMock.expectLastCall();
        }
        Capture newCapture2 = EasyMock.newCapture();
        this.member.revokeAssignment((ExtendedAssignment) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        int i2 = 100;
        for (int i3 = 5; i3 > 0; i3--) {
            long milliseconds = this.time.milliseconds();
            int i4 = (DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10) / i3;
            this.herder.tick();
            Assert.assertEquals(milliseconds + i2 + i4, this.time.milliseconds());
            i2 = 0;
        }
        long milliseconds2 = this.time.milliseconds();
        this.herder.tick();
        Assert.assertEquals(milliseconds2, this.time.milliseconds());
        Assert.assertEquals(Collections.singleton(CONN1), ((ExtendedAssignment) newCapture2.getValue()).connectors());
        Assert.assertEquals(Collections.singleton(TASK1), ((ExtendedAssignment) newCapture2.getValue()).tasks());
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testAccessors() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).anyTimes();
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT).times(2);
        WorkerConfigTransformer workerConfigTransformer = (WorkerConfigTransformer) EasyMock.mock(WorkerConfigTransformer.class);
        EasyMock.expect(workerConfigTransformer.transform((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject())).andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info"));
        EasyMock.replay(new Object[]{workerConfigTransformer});
        new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), workerConfigTransformer);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectors(futureCallback);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.connectorInfo(CONN1, futureCallback2);
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        FutureCallback futureCallback4 = new FutureCallback();
        this.herder.taskConfigs(CONN1, futureCallback4);
        this.herder.tick();
        Assert.assertTrue(futureCallback.isDone());
        Assert.assertEquals(Collections.singleton(CONN1), futureCallback.get());
        Assert.assertTrue(futureCallback2.isDone());
        Assert.assertEquals(new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2), ConnectorType.SOURCE), futureCallback2.get());
        Assert.assertTrue(futureCallback3.isDone());
        Assert.assertEquals(CONN1_CONFIG, futureCallback3.get());
        Assert.assertTrue(futureCallback4.isDone());
        Assert.assertEquals(Arrays.asList(new TaskInfo(TASK0, TASK_CONFIG), new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), futureCallback4.get());
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT);
        Capture newCapture2 = EasyMock.newCapture();
        this.herder.validateConnectorConfig((Map) EasyMock.eq(CONN1_CONFIG_UPDATED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, CONN1_CONFIG_INFOS);
            return null;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
        PowerMock.expectLastCall().andAnswer(() -> {
            this.configUpdateListener.onConnectorConfigUpdate(CONN1);
            return null;
        });
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG).times(2);
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 0);
        Capture newCapture3 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture3));
        PowerMock.expectLastCall().andAnswer(() -> {
            ((Callback) newCapture3.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfigUpdated, () -> {
            return TASK_CONFIGS;
        });
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback);
        this.herder.tick();
        Assert.assertTrue(futureCallback.isDone());
        Assert.assertEquals(CONN1_CONFIG, futureCallback.get());
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, futureCallback2);
        this.herder.tick();
        Assert.assertTrue(futureCallback2.isDone());
        Assert.assertEquals(new Herder.Created(false, new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2), ConnectorType.SOURCE)), futureCallback2.get());
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        this.herder.tick();
        Assert.assertTrue(futureCallback3.isDone());
        Assert.assertEquals(CONN1_CONFIG_UPDATED, futureCallback3.get());
        PowerMock.verifyAll();
    }

    @Test
    public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
        long j = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.poll(Long.MAX_VALUE);
        EasyMock.expectLastCall();
        expectRebalance(2L, Collections.emptyList(), Collections.emptyList());
        SessionKey sessionKey = new SessionKey((SecretKey) EasyMock.mock(SecretKey.class), 0L);
        expectConfigRefreshAndSnapshot(new ClusterConfigState(2L, sessionKey, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()));
        this.member.poll(Long.MAX_VALUE);
        EasyMock.expectLastCall();
        expectRebalance(2L, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL, true);
        Capture newCapture = EasyMock.newCapture();
        this.configBackingStore.putSessionKey((SessionKey) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.configUpdateListener.onSessionKeyUpdate((SessionKey) newCapture.getValue());
            return null;
        });
        this.member.poll(EasyMock.leq(j));
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testKeyRotationDisabledWhenWorkerBecomesFollower() throws Exception {
        long j = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL, true);
        SecretKey secretKey = (SecretKey) EasyMock.mock(SecretKey.class);
        EasyMock.expect(secretKey.getAlgorithm()).andReturn("HmacSHA256").anyTimes();
        EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]).anyTimes();
        SessionKey sessionKey = new SessionKey(secretKey, this.time.milliseconds());
        expectConfigRefreshAndSnapshot(new ClusterConfigState(1L, sessionKey, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()));
        this.member.poll(EasyMock.leq(j));
        EasyMock.expectLastCall();
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(Long.MAX_VALUE);
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[]{secretKey});
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsSignatureNotRequiredV0() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        this.member.wakeup();
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 0).anyTimes();
        PowerMock.replayAll(new Object[]{callback});
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, (InternalRequestSignature) null);
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsSignatureNotRequiredV1() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        this.member.wakeup();
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 1).anyTimes();
        PowerMock.replayAll(new Object[]{callback});
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, (InternalRequestSignature) null);
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsMissingRequiredSignature() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        Capture newInstance = Capture.newInstance();
        callback.onCompletion((Throwable) EasyMock.capture(newInstance), EasyMock.eq((Object) null));
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 2).anyTimes();
        PowerMock.replayAll(new Object[]{callback});
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, (InternalRequestSignature) null);
        PowerMock.verifyAll();
        Assert.assertTrue(newInstance.getValue() instanceof BadRequestException);
    }

    @Test
    public void testPutTaskConfigsDisallowedSignatureAlgorithm() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        Capture newInstance = Capture.newInstance();
        callback.onCompletion((Throwable) EasyMock.capture(newInstance), EasyMock.eq((Object) null));
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 2).anyTimes();
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) EasyMock.mock(InternalRequestSignature.class);
        EasyMock.expect(internalRequestSignature.keyAlgorithm()).andReturn("HmacSHA489").anyTimes();
        PowerMock.replayAll(new Object[]{callback, internalRequestSignature});
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        PowerMock.verifyAll();
        Assert.assertTrue(newInstance.getValue() instanceof BadRequestException);
    }

    @Test
    public void testPutTaskConfigsInvalidSignature() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        Capture newInstance = Capture.newInstance();
        callback.onCompletion((Throwable) EasyMock.capture(newInstance), EasyMock.eq((Object) null));
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 2).anyTimes();
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) EasyMock.mock(InternalRequestSignature.class);
        EasyMock.expect(internalRequestSignature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
        EasyMock.expect(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) EasyMock.anyObject()))).andReturn(false).anyTimes();
        SessionKey sessionKey = (SessionKey) EasyMock.mock(SessionKey.class);
        SecretKey secretKey = (SecretKey) EasyMock.niceMock(SecretKey.class);
        EasyMock.expect(sessionKey.key()).andReturn(secretKey);
        EasyMock.expect(Long.valueOf(sessionKey.creationTimestamp())).andReturn(Long.valueOf(this.time.milliseconds()));
        PowerMock.replayAll(new Object[]{callback, internalRequestSignature, sessionKey, secretKey});
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        PowerMock.verifyAll();
        Assert.assertTrue(newInstance.getValue() instanceof ConnectRestException);
        Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), ((ConnectRestException) newInstance.getValue()).statusCode());
    }

    @Test
    public void putTaskConfigsWorkerStillStarting() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        Capture newInstance = Capture.newInstance();
        callback.onCompletion((Throwable) EasyMock.capture(newInstance), EasyMock.eq((Object) null));
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 2).anyTimes();
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) EasyMock.mock(InternalRequestSignature.class);
        EasyMock.expect(internalRequestSignature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
        EasyMock.expect(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) EasyMock.anyObject()))).andReturn(true).anyTimes();
        PowerMock.replayAll(new Object[]{callback, internalRequestSignature});
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        PowerMock.verifyAll();
        Assert.assertTrue(newInstance.getValue() instanceof ConnectRestException);
        Assert.assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), ((ConnectRestException) newInstance.getValue()).statusCode());
    }

    @Test
    public void testPutTaskConfigsValidRequiredSignature() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        this.member.wakeup();
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 2).anyTimes();
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) EasyMock.mock(InternalRequestSignature.class);
        EasyMock.expect(internalRequestSignature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
        EasyMock.expect(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) EasyMock.anyObject()))).andReturn(true).anyTimes();
        SessionKey sessionKey = (SessionKey) EasyMock.mock(SessionKey.class);
        SecretKey secretKey = (SecretKey) EasyMock.niceMock(SecretKey.class);
        EasyMock.expect(sessionKey.key()).andReturn(secretKey);
        EasyMock.expect(Long.valueOf(sessionKey.creationTimestamp())).andReturn(Long.valueOf(this.time.milliseconds()));
        PowerMock.replayAll(new Object[]{callback, internalRequestSignature, sessionKey, secretKey});
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailedToWriteSessionKey() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.configBackingStore.putSessionKey((SessionKey) EasyMock.anyObject(SessionKey.class));
        EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.configBackingStore.putSessionKey((SessionKey) EasyMock.anyObject(SessionKey.class));
        EasyMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
        SecretKey secretKey = (SecretKey) EasyMock.niceMock(SecretKey.class);
        EasyMock.expect(secretKey.getAlgorithm()).andReturn("HmacSHA256");
        EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
        SessionKey sessionKey = new SessionKey(secretKey, this.time.milliseconds());
        ClusterConfigState clusterConfigState = new ClusterConfigState(1L, sessionKey, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.configBackingStore.putSessionKey((SessionKey) EasyMock.anyObject(SessionKey.class));
        EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
        this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.configUpdateListener.onSessionKeyUpdate(sessionKey);
            return null;
        });
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(clusterConfigState);
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[]{secretKey});
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testFenceZombiesInvalidSignature() {
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        Capture newInstance = Capture.newInstance();
        callback.onCompletion((Throwable) EasyMock.capture(newInstance), EasyMock.eq((Object) null));
        EasyMock.expectLastCall().once();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andReturn((short) 2).anyTimes();
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) EasyMock.mock(InternalRequestSignature.class);
        EasyMock.expect(internalRequestSignature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
        EasyMock.expect(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) EasyMock.anyObject()))).andReturn(false).anyTimes();
        SessionKey sessionKey = (SessionKey) EasyMock.mock(SessionKey.class);
        SecretKey secretKey = (SecretKey) EasyMock.niceMock(SecretKey.class);
        EasyMock.expect(sessionKey.key()).andReturn(secretKey);
        EasyMock.expect(Long.valueOf(sessionKey.creationTimestamp())).andReturn(Long.valueOf(this.time.milliseconds()));
        PowerMock.replayAll(new Object[]{callback, internalRequestSignature, sessionKey, secretKey});
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.fenceZombieSourceTasks(CONN1, callback, internalRequestSignature);
        PowerMock.verifyAll();
        Assert.assertTrue(newInstance.getValue() instanceof ConnectRestException);
        Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), ((ConnectRestException) newInstance.getValue()).statusCode());
    }

    @Test
    public void testTaskRequestedZombieFencingForwardedToLeader() throws Exception {
        testTaskRequestedZombieFencingForwardingToLeader(true);
    }

    @Test
    public void testTaskRequestedZombieFencingFailedForwardToLeader() throws Exception {
        testTaskRequestedZombieFencingForwardingToLeader(false);
    }

    private void testTaskRequestedZombieFencingForwardingToLeader(boolean z) throws Exception {
        expectHerderStartup();
        ExecutorService executorService = (ExecutorService) EasyMock.mock(ExecutorService.class);
        this.herder.forwardRequestExecutor = executorService;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectAnyTicks();
        this.member.wakeup();
        EasyMock.expectLastCall();
        IExpectationSetters expect = EasyMock.expect(this.restClient.httpRequest((String) EasyMock.anyObject(), (String) EasyMock.eq("PUT"), (HttpHeaders) EasyMock.isNull(), EasyMock.isNull(), (TypeReference) EasyMock.isNull(), (SecretKey) EasyMock.anyObject(), (String) EasyMock.anyObject()));
        if (z) {
            expect.andReturn((Object) null);
        } else {
            expect.andThrow(new ConnectRestException(409, "Rebalance :("));
        }
        Capture newCapture = EasyMock.newCapture();
        executorService.execute((Runnable) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(() -> {
            ((Runnable) newCapture.getValue()).run();
            return null;
        });
        expectHerderShutdown(true);
        executorService.shutdown();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(executorService.awaitTermination(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject()))).andReturn(true);
        PowerMock.replayAll(new Object[]{executorService});
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(TASK1, futureCallback);
        if (z) {
            futureCallback.get(10L, TimeUnit.SECONDS);
        } else {
            Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
            })).getCause() instanceof ConnectException);
        }
        stopBackgroundHerder();
        PowerMock.verifyAll();
    }

    @Test
    public void testExternalZombieFencingRequestForAlreadyFencedConnector() throws Exception {
        testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 12), Collections.singletonMap(CONN1, 5), Collections.emptySet()), false);
    }

    @Test
    public void testExternalZombieFencingRequestForSingleTaskConnector() throws Exception {
        testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(exactlyOnceSnapshot(expectNewSessionKey(), Collections.singletonMap(TASK1, TASK_CONFIG), Collections.singletonMap(CONN1, 1), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)), true);
    }

    @Test
    public void testExternalZombieFencingRequestForFreshConnector() throws Exception {
        testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)), true);
    }

    private void testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(ClusterConfigState clusterConfigState, boolean z) throws Exception {
        expectHerderStartup();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectAnyTicks();
        this.member.wakeup();
        EasyMock.expectLastCall().anyTimes();
        expectConfigRefreshAndSnapshot(clusterConfigState);
        if (z) {
            this.configBackingStore.putTaskCountRecord(CONN1, 1);
            EasyMock.expectLastCall();
        }
        expectHerderShutdown(false);
        PowerMock.replayAll(new Object[0]);
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        futureCallback.get(10L, TimeUnit.SECONDS);
        stopBackgroundHerder();
        PowerMock.verifyAll();
    }

    @Test
    public void testExternalZombieFencingRequestImmediateCompletion() throws Exception {
        expectHerderStartup();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        SessionKey expectNewSessionKey = expectNewSessionKey();
        expectAnyTicks();
        this.member.wakeup();
        EasyMock.expectLastCall();
        ClusterConfigState exactlyOnceSnapshot = exactlyOnceSnapshot(expectNewSessionKey, TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 2), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1));
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot);
        KafkaFuture kafkaFuture = (KafkaFuture) EasyMock.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) EasyMock.mock(KafkaFuture.class);
        for (int i = 0; i < 2; i++) {
            Capture newCapture = EasyMock.newCapture();
            EasyMock.expect(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) EasyMock.capture(newCapture))).andAnswer(() -> {
                ((KafkaFuture.BiConsumer) newCapture.getValue()).accept((Object) null, (Object) null);
                return null;
            });
        }
        Capture newCapture2 = EasyMock.newCapture();
        EasyMock.expect(kafkaFuture.thenApply((KafkaFuture.BaseFunction) EasyMock.capture(newCapture2))).andAnswer(() -> {
            ((KafkaFuture.BaseFunction) newCapture2.getValue()).apply((Object) null);
            return kafkaFuture2;
        });
        EasyMock.expect(this.worker.fenceZombies((String) EasyMock.eq(CONN1), EasyMock.eq(2), (Map) EasyMock.eq(CONN1_CONFIG))).andReturn(kafkaFuture);
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot);
        this.configBackingStore.putTaskCountRecord(CONN1, 1);
        EasyMock.expectLastCall();
        expectHerderShutdown(true);
        PowerMock.replayAll(new Object[]{kafkaFuture, kafkaFuture2});
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        futureCallback.get(10L, TimeUnit.SECONDS);
        stopBackgroundHerder();
        PowerMock.verifyAll();
    }

    @Test
    public void testExternalZombieFencingRequestSynchronousFailure() throws Exception {
        expectHerderStartup();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        SessionKey expectNewSessionKey = expectNewSessionKey();
        expectAnyTicks();
        this.member.wakeup();
        EasyMock.expectLastCall();
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot(expectNewSessionKey, TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 2), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)));
        KafkaException kafkaException = new KafkaException("whoops!");
        EasyMock.expect(this.worker.fenceZombies((String) EasyMock.eq(CONN1), EasyMock.eq(2), (Map) EasyMock.eq(CONN1_CONFIG))).andThrow(kafkaException);
        expectHerderShutdown(true);
        PowerMock.replayAll(new Object[0]);
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        Assert.assertEquals(kafkaException, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause());
        stopBackgroundHerder();
        PowerMock.verifyAll();
    }

    @Test
    public void testExternalZombieFencingRequestAsynchronousFailure() throws Exception {
        expectHerderStartup();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        SessionKey expectNewSessionKey = expectNewSessionKey();
        expectAnyTicks();
        this.member.wakeup();
        EasyMock.expectLastCall();
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot(expectNewSessionKey, TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 2), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)));
        KafkaFuture kafkaFuture = (KafkaFuture) EasyMock.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) EasyMock.mock(KafkaFuture.class);
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        EasyMock.expect(this.worker.fenceZombies((String) EasyMock.eq(CONN1), EasyMock.eq(2), (Map) EasyMock.eq(CONN1_CONFIG))).andReturn(kafkaFuture);
        EasyMock.expect(kafkaFuture.thenApply((KafkaFuture.BaseFunction) EasyMock.anyObject())).andReturn(kafkaFuture2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        for (int i = 0; i < 2; i++) {
            EasyMock.expect(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) EasyMock.capture(newCapture))).andAnswer(() -> {
                countDownLatch.countDown();
                return null;
            });
        }
        expectHerderShutdown(true);
        PowerMock.replayAll(new Object[]{kafkaFuture, kafkaFuture2});
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        AuthorizationException authorizationException = new AuthorizationException("you didn't say the magic word");
        newCapture.getValues().forEach(biConsumer -> {
            biConsumer.accept((Object) null, authorizationException);
        });
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof ConnectException);
        stopBackgroundHerder();
        PowerMock.verifyAll();
    }

    @Test
    public void testExternalZombieFencingRequestDelayedCompletion() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CONN1, 5);
        hashMap.put(CONN2, 3);
        hashMap.put("SourceC", 12);
        expectHerderStartup();
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        SessionKey expectNewSessionKey = expectNewSessionKey();
        expectAnyTicks();
        this.member.wakeup();
        EasyMock.expectLastCall().anyTimes();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(CONN1, 2);
        hashMap2.put(CONN2, 3);
        hashMap2.put("SourceC", 5);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(CONN1, 3);
        hashMap3.put(CONN2, 4);
        hashMap3.put("SourceC", 2);
        ClusterConfigState exactlyOnceSnapshot = exactlyOnceSnapshot(expectNewSessionKey, TASK_CONFIGS_MAP, hashMap2, hashMap3, new HashSet(Arrays.asList(CONN1, CONN2, "SourceC")), hashMap);
        hashMap.keySet().forEach(str -> {
            expectConfigRefreshAndSnapshot(exactlyOnceSnapshot);
        });
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        HashMap hashMap6 = new HashMap();
        hashMap.forEach((str2, num) -> {
            KafkaFuture kafkaFuture = (KafkaFuture) EasyMock.mock(KafkaFuture.class);
            KafkaFuture kafkaFuture2 = (KafkaFuture) EasyMock.mock(KafkaFuture.class);
            Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
            hashMap4.put(str2, newCapture);
            EasyMock.expect(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) EasyMock.capture(newCapture))).andReturn((Object) null).times(num.intValue() + 1);
            Capture newCapture2 = EasyMock.newCapture();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            hashMap5.put(str2, newCapture2);
            hashMap6.put(str2, countDownLatch);
            EasyMock.expect(kafkaFuture.thenApply((KafkaFuture.BaseFunction) EasyMock.capture(newCapture2))).andAnswer(() -> {
                countDownLatch.countDown();
                return kafkaFuture2;
            });
            EasyMock.expect(this.worker.fenceZombies((String) EasyMock.eq(str2), ((Integer) EasyMock.eq(hashMap2.get(str2))).intValue(), (Map) EasyMock.anyObject())).andReturn(kafkaFuture);
            for (int i = 0; i < num.intValue(); i++) {
                expectConfigRefreshAndSnapshot(exactlyOnceSnapshot);
            }
            PowerMock.replay(new Object[]{kafkaFuture, kafkaFuture2});
        });
        hashMap.forEach((str3, num2) -> {
            this.configBackingStore.putTaskCountRecord(str3, num2.intValue());
            EasyMock.expectLastCall();
        });
        expectHerderShutdown(false);
        PowerMock.replayAll(new Object[0]);
        startBackgroundHerder();
        ArrayList arrayList = new ArrayList();
        hashMap.forEach((str4, num3) -> {
            List list = (List) IntStream.range(0, num3.intValue()).mapToObj(i -> {
                return new FutureCallback();
            }).collect(Collectors.toList());
            list.forEach(futureCallback -> {
                this.herder.fenceZombieSourceTasks(str4, futureCallback);
            });
            arrayList.addAll(list);
        });
        hashMap6.forEach((str5, countDownLatch) -> {
            try {
                Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                ((KafkaFuture.BaseFunction) ((Capture) hashMap5.get(str5)).getValue()).apply((Object) null);
                ((Capture) hashMap4.get(str5)).getValues().forEach(biConsumer -> {
                    biConsumer.accept((Object) null, (Object) null);
                });
            } catch (InterruptedException e) {
                Assert.fail("Unexpectedly interrupted");
            }
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((FutureCallback) it.next()).get(10L, TimeUnit.SECONDS);
        }
        stopBackgroundHerder();
        PowerMock.verifyAll();
    }

    @Test
    public void testVerifyTaskGeneration() {
        HashMap hashMap = new HashMap();
        this.herder.configState = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), hashMap, Collections.emptySet(), Collections.emptySet());
        Callback callback = (Callback) EasyMock.mock(Callback.class);
        for (int i = 0; i < 5; i++) {
            callback.onCompletion((Throwable) null, (Object) null);
            EasyMock.expectLastCall();
        }
        PowerMock.replayAll(new Object[0]);
        this.herder.assignment = new ExtendedAssignment((short) 2, (short) 0, "leader", "leaderUrl", 0L, Collections.emptySet(), Collections.singleton(TASK1), Collections.emptySet(), Collections.emptySet(), 0);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        hashMap.put(CONN1, 0);
        this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        hashMap.put(CONN1, 1);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        hashMap.put(CONN1, 2);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        hashMap.put(CONN1, 3);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN2, 0);
        hashMap.put(connectorTaskId.connector(), 1);
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(connectorTaskId, 0, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(connectorTaskId, 1, callback);
        });
        Assert.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(connectorTaskId, 2, callback);
        });
        PowerMock.verifyAll();
    }

    @Test
    public void testKeyExceptionDetection() {
        Assert.assertFalse(this.herder.isPossibleExpiredKeyException(this.time.milliseconds(), new RuntimeException()));
        Assert.assertFalse(this.herder.isPossibleExpiredKeyException(this.time.milliseconds(), new BadRequestException("")));
        Assert.assertFalse(this.herder.isPossibleExpiredKeyException(this.time.milliseconds() - TimeUnit.MINUTES.toMillis(2L), new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "")));
        Assert.assertTrue(this.herder.isPossibleExpiredKeyException(this.time.milliseconds(), new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "")));
    }

    @Test
    public void testInconsistentConfigs() {
    }

    @Test
    public void testThreadNames() {
        Assert.assertTrue(((ThreadPoolExecutor) Whitebox.getInternalState(this.herder, "herderExecutor")).getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith(DistributedHerder.class.getSimpleName()));
        Assert.assertTrue(((ThreadPoolExecutor) Whitebox.getInternalState(this.herder, "forwardRequestExecutor")).getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("ForwardRequestExecutor"));
        Assert.assertTrue(((ThreadPoolExecutor) Whitebox.getInternalState(this.herder, "startAndStopExecutor")).getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor"));
    }

    @Test
    public void testHerderStopServicesClosesUponShutdown() {
        Assert.assertEquals(1L, this.shutdownCalled.getCount());
        this.herder.stopServices();
        Assert.assertEquals(0L, this.shutdownCalled.getCount());
    }

    private void expectConfigDecoration() {
        EasyMock.expect(this.worker.configDecorator()).andReturn(this.decorator).times(2);
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.decorator.decorateConnectorConfig(EasyMock.anyString(), (Connector) EasyMock.anyObject(Connector.class), (ConfigDef) EasyMock.anyObject(ConfigDef.class), (Map) EasyMock.capture(newCapture)));
        newCapture.getClass();
        expect.andAnswer(newCapture::getValue);
        Capture newCapture2 = EasyMock.newCapture();
        IExpectationSetters expect2 = EasyMock.expect(this.decorator.decorateValidationResult(EasyMock.anyString(), (Connector) EasyMock.anyObject(Connector.class), (ConfigDef) EasyMock.anyObject(ConfigDef.class), (Map) EasyMock.anyObject(Map.class), (ConfigInfos) EasyMock.capture(newCapture2)));
        newCapture2.getClass();
        expect2.andAnswer(newCapture2::getValue);
    }

    @Test
    public void testPollDurationOnSlowConnectorOperations() {
        this.connectProtocolVersion = (short) 1;
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn(Short.valueOf(this.connectProtocolVersion));
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList(), 20000);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture));
        PowerMock.expectLastCall().andAnswer(() -> {
            this.time.sleep(10000L);
            ((Callback) newCapture.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, () -> {
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.leq(10000L));
        PowerMock.expectLastCall();
        this.member.wakeup();
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList(), 20000);
        EasyMock.expect(this.configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
        this.worker.stopAndAwaitConnector(CONN1);
        PowerMock.expectLastCall();
        EasyMock.expect(Short.valueOf(this.member.currentProtocolVersion())).andStubReturn(Short.valueOf(this.connectProtocolVersion));
        Capture newCapture2 = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONN1), (Map) EasyMock.anyObject(), (CloseableConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED), (Callback) EasyMock.capture(newCapture2));
        PowerMock.expectLastCall().andAnswer(() -> {
            this.time.sleep(10000L);
            ((Callback) newCapture2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.leq(10000L));
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList(), 20000);
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfigUpdated, () -> {
            this.time.sleep(10000L);
            return TASK_CONFIGS;
        });
        this.member.poll(EasyMock.leq(10000L));
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2) {
        expectRebalance(j, list, list2, false);
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2, boolean z) {
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, j, "leader", "leaderUrl", list, list2, 0, z);
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2, String str, String str2, boolean z) {
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, j, str, str2, list, list2, 0, z);
    }

    private void expectRebalance(Collection<String> collection, List<ConnectorTaskId> list, short s, long j, List<String> list2, List<ConnectorTaskId> list3) {
        expectRebalance(collection, list, s, j, list2, list3, 0);
    }

    private void expectRebalance(Collection<String> collection, List<ConnectorTaskId> list, short s, long j, List<String> list2, List<ConnectorTaskId> list3, int i) {
        expectRebalance(collection, list, s, j, "leader", "leaderUrl", list2, list3, i, false);
    }

    private void expectRebalance(Collection<String> collection, List<ConnectorTaskId> list, short s, long j, String str, String str2, List<String> list2, List<ConnectorTaskId> list3, int i, boolean z) {
        this.member.ensureActive();
        PowerMock.expectLastCall().andAnswer(() -> {
            if (!collection.isEmpty() || !list.isEmpty()) {
                this.rebalanceListener.onRevoked(str, collection, list);
            }
            this.rebalanceListener.onAssigned(this.connectProtocolVersion == 0 ? new ExtendedAssignment(this.connectProtocolVersion, s, str, str2, j, list2, list3, Collections.emptyList(), Collections.emptyList(), 0) : new ExtendedAssignment(this.connectProtocolVersion, s, str, str2, j, list2, list3, new ArrayList(collection), new ArrayList(list), i), 3);
            this.time.sleep(100L);
            return null;
        });
        if (z) {
            this.configBackingStore.claimWritePrivileges();
            EasyMock.expectLastCall();
        }
        if (!collection.isEmpty()) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                this.worker.stopAndAwaitConnector(it.next());
                PowerMock.expectLastCall();
            }
        }
        if (!list.isEmpty()) {
            this.worker.stopAndAwaitTask((ConnectorTaskId) EasyMock.anyObject(ConnectorTaskId.class));
            PowerMock.expectLastCall();
        }
        if (!collection.isEmpty()) {
            this.statusBackingStore.flush();
            PowerMock.expectLastCall();
        }
        this.member.wakeup();
        PowerMock.expectLastCall();
    }

    private ClusterConfigState exactlyOnceSnapshot(SessionKey sessionKey, Map<ConnectorTaskId, Map<String, String>> map, Map<String, Integer> map2, Map<String, Integer> map3, Set<String> set) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(map2.keySet());
        hashSet.addAll(map3.keySet());
        hashSet.addAll(set);
        return exactlyOnceSnapshot(sessionKey, map, map2, map3, set, (Map) hashSet.stream().collect(Collectors.toMap(Function.identity(), str -> {
            return 1;
        })));
    }

    private ClusterConfigState exactlyOnceSnapshot(SessionKey sessionKey, Map<ConnectorTaskId, Map<String, String>> map, Map<String, Integer> map2, Map<String, Integer> map3, Set<String> set, Map<String, Integer> map4) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(map4.keySet());
        hashSet.addAll(map2.keySet());
        hashSet.addAll(map3.keySet());
        hashSet.addAll(set);
        return new ClusterConfigState(1L, sessionKey, map4, (Map) hashSet.stream().collect(Collectors.toMap(Function.identity(), str -> {
            return CONN1_CONFIG;
        })), Collections.singletonMap(CONN1, TargetState.STARTED), map, map2, map3, set, Collections.emptySet());
    }

    private void expectExecuteTaskReconfiguration(boolean z, ConnectorConfig connectorConfig, IAnswer<List<Map<String, String>>> iAnswer) {
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONN1))).andReturn(Boolean.valueOf(z));
        if (z) {
            EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
            EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, connectorConfig)).andAnswer(iAnswer);
        }
    }

    private void expectAnyTicks() {
        this.member.ensureActive();
        EasyMock.expectLastCall().anyTimes();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall().anyTimes();
    }

    private SessionKey expectNewSessionKey() {
        SecretKey secretKey = (SecretKey) EasyMock.niceMock(SecretKey.class);
        EasyMock.expect(secretKey.getAlgorithm()).andReturn("HmacSHA256").anyTimes();
        EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]).anyTimes();
        SessionKey sessionKey = new SessionKey(secretKey, this.time.milliseconds() + TimeUnit.DAYS.toMillis(1L));
        this.configBackingStore.putSessionKey((SessionKey) EasyMock.anyObject(SessionKey.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.configUpdateListener.onSessionKeyUpdate(sessionKey);
            return null;
        });
        EasyMock.replay(new Object[]{secretKey});
        return sessionKey;
    }

    private void startBackgroundHerder() {
        this.herderExecutor = Executors.newSingleThreadExecutor();
        this.herderFuture = this.herderExecutor.submit((Runnable) this.herder);
    }

    private void stopBackgroundHerder() throws Exception {
        this.herder.stop();
        this.herderExecutor.shutdown();
        Assert.assertTrue("herder thread did not finish in time", this.herderExecutor.awaitTermination(10L, TimeUnit.SECONDS));
        this.herderFuture.get();
    }

    private void expectHerderStartup() {
        this.worker.start();
        EasyMock.expectLastCall();
        this.statusBackingStore.start();
        EasyMock.expectLastCall();
        this.configBackingStore.start();
        EasyMock.expectLastCall();
    }

    private void expectHerderShutdown(boolean z) {
        if (z) {
            this.member.wakeup();
            EasyMock.expectLastCall();
        }
        EasyMock.expect(this.worker.connectorNames()).andReturn(Collections.emptySet());
        EasyMock.expect(this.worker.taskIds()).andReturn(Collections.emptySet());
        this.member.stop();
        EasyMock.expectLastCall();
        this.statusBackingStore.stop();
        EasyMock.expectLastCall();
        this.configBackingStore.stop();
        EasyMock.expectLastCall();
        this.worker.stop();
        EasyMock.expectLastCall();
    }

    private void expectConfigRefreshAndSnapshot(ClusterConfigState clusterConfigState) {
        try {
            this.configBackingStore.refresh(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
            EasyMock.expectLastCall();
            EasyMock.expect(this.configBackingStore.snapshot()).andReturn(clusterConfigState);
        } catch (TimeoutException e) {
            Assert.fail("Mocked method should not throw checked exception");
        }
    }

    private void assertStatistics(int i, int i2, double d, double d2) {
        assertStatistics(i2 <= 0 ? null : "leaderUrl", false, i, i2, d, d2);
    }

    private void assertStatistics(String str, boolean z, int i, int i2, double d, double d2) {
        ConnectMetrics.MetricGroup metricGroup = this.herder.herderMetrics().metricGroup();
        double currentMetricValueAsDouble = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "epoch");
        String currentMetricValueAsString = MockConnectMetrics.currentMetricValueAsString(this.metrics, metricGroup, "leader-name");
        double currentMetricValueAsDouble2 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "completed-rebalances-total");
        double currentMetricValueAsDouble3 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "rebalancing");
        double currentMetricValueAsDouble4 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "rebalance-max-time-ms");
        double currentMetricValueAsDouble5 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "rebalance-avg-time-ms");
        double currentMetricValueAsDouble6 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "time-since-last-rebalance-ms");
        Assert.assertEquals(i, currentMetricValueAsDouble, 1.0E-4d);
        Assert.assertEquals(str, currentMetricValueAsString);
        Assert.assertEquals(i2, currentMetricValueAsDouble2, 1.0E-4d);
        Assert.assertEquals(z ? 1.0d : 0.0d, currentMetricValueAsDouble3, 1.0E-4d);
        Assert.assertEquals(d2, currentMetricValueAsDouble6, 1.0E-4d);
        if (d <= 0.0d) {
            Assert.assertEquals(Double.NaN, currentMetricValueAsDouble4, 1.0E-4d);
            Assert.assertEquals(Double.NaN, currentMetricValueAsDouble5, 1.0E-4d);
        } else {
            Assert.assertEquals(d, currentMetricValueAsDouble4, 1.0E-4d);
            Assert.assertEquals(d, currentMetricValueAsDouble5, 1.0E-4d);
        }
    }

    @Test
    public void processRestartRequestsFailureSuppression() {
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        RestartRequest restartRequest = new RestartRequest(FOO_TOPIC, false, false);
        EasyMock.expect(this.herder.buildRestartPlan(restartRequest)).andThrow(new RuntimeException()).anyTimes();
        PowerMock.replayAll(new Object[0]);
        this.configUpdateListener.onRestartRequest(restartRequest);
        Assert.assertEquals(1L, this.herder.pendingRestartRequests.size());
        this.herder.processRestartRequests();
        Assert.assertTrue(this.herder.pendingRestartRequests.isEmpty());
    }

    @Test
    public void processRestartRequestsDequeue() {
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        EasyMock.expect(this.herder.buildRestartPlan((RestartRequest) EasyMock.anyObject(RestartRequest.class))).andReturn(Optional.empty()).anyTimes();
        PowerMock.replayAll(new Object[0]);
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, false, false));
        this.configUpdateListener.onRestartRequest(new RestartRequest(BAR_TOPIC, false, false));
        Assert.assertEquals(2L, this.herder.pendingRestartRequests.size());
        this.herder.processRestartRequests();
        Assert.assertTrue(this.herder.pendingRestartRequests.isEmpty());
    }

    @Test
    public void preserveHighestImpactRestartRequest() {
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        PowerMock.replayAll(new Object[0]);
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, false, false));
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, false, true));
        Assert.assertEquals(1L, this.herder.pendingRestartRequests.size());
        Assert.assertFalse(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).onlyFailed());
        Assert.assertTrue(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).includeTasks());
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, true, false));
        Assert.assertEquals(1L, this.herder.pendingRestartRequests.size());
        Assert.assertFalse(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).onlyFailed());
        Assert.assertTrue(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).includeTasks());
    }

    private DistributedHerder exactlyOnceHerder() {
        HashMap hashMap = new HashMap(HERDER_CONFIG);
        hashMap.put("exactly.once.source.support", "enabled");
        return (DistributedHerder) PowerMock.createPartialMock(DistributedHerder.class, new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"}, new Object[]{new DistributedConfig(hashMap), this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, this.configBackingStore, this.member, MEMBER_URL, this.restClient, this.metrics, this.time, this.noneConnectorClientConfigOverridePolicy, new AutoCloseable[0]});
    }

    static {
        HERDER_CONFIG.put("status.storage.topic", "status-topic");
        HERDER_CONFIG.put("config.storage.topic", "config-topic");
        HERDER_CONFIG.put("bootstrap.servers", "localhost:9092");
        HERDER_CONFIG.put("group.id", "connect-test-group");
        HERDER_CONFIG.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("offset.storage.topic", "connect-offsets");
        TASK0 = new ConnectorTaskId(CONN1, 0);
        TASK1 = new ConnectorTaskId(CONN1, 1);
        TASK2 = new ConnectorTaskId(CONN1, 2);
        MAX_TASKS = 3;
        CONN1_CONFIG = new HashMap();
        CONN1_CONFIG.put("name", CONN1);
        CONN1_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN1_CONFIG.put("topics", String.join(",", FOO_TOPIC, BAR_TOPIC));
        CONN1_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        CONN1_CONFIG_UPDATED = new HashMap(CONN1_CONFIG);
        CONN1_CONFIG_UPDATED.put("topics", String.join(",", FOO_TOPIC, BAR_TOPIC, BAZ_TOPIC));
        CONN1_CONFIG_INFOS = new ConfigInfos(CONN1, 0, Collections.emptyList(), Collections.emptyList());
        CONN2_CONFIG = new HashMap();
        CONN2_CONFIG.put("name", CONN2);
        CONN2_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN2_CONFIG.put("topics", String.join(",", FOO_TOPIC, BAR_TOPIC));
        CONN2_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        CONN2_CONFIG_INFOS = new ConfigInfos(CONN2, 0, Collections.emptyList(), Collections.emptyList());
        CONN2_INVALID_CONFIG_INFOS = new ConfigInfos(CONN2, 1, Collections.emptyList(), Collections.emptyList());
        TASK_CONFIG = new HashMap();
        TASK_CONFIG.put("task.class", BogusSourceTask.class.getName());
        TASK_CONFIGS = new ArrayList();
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS_MAP = new HashMap<>();
        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
        SNAPSHOT = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
        SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
        SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
        EMPTY_RUNNABLE = () -> {
        };
    }
}
