package org.apache.kafka.connect.runtime.standalone;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.CloseableConnectorContext;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.RestartPlan;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.class */
public class StandaloneHerderTest {
    private static final String CONNECTOR_NAME = "test";
    private static final String TOPICS_LIST_STR = "topic1,topic2";
    private static final String WORKER_ID = "localhost:8083";
    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
    private StandaloneHerder herder;
    private Connector connector;

    @Mock
    protected Worker worker;

    @Mock
    protected WorkerConfigTransformer transformer;

    @Mock
    private Plugins plugins;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private LoaderSwap loaderSwap;
    protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;

    @Mock
    protected StatusBackingStore statusBackingStore;
    private final SampleConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSinkConnector.class */
    private abstract class BogusSinkConnector extends SinkConnector {
        private BogusSinkConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSinkTask.class */
    private abstract class BogusSinkTask extends SourceTask {
        private BogusSinkTask() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSourceConnector.class */
    private abstract class BogusSourceConnector extends SourceConnector {
        private BogusSourceConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSourceTask.class */
    private abstract class BogusSourceTask extends SourceTask {
        private BogusSourceTask() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$SourceSink.class */
    public enum SourceSink {
        SOURCE,
        SINK
    }

    @Before
    public void setup() throws ExecutionException, InterruptedException {
        this.worker = (Worker) Mockito.mock(Worker.class);
        this.herder = (StandaloneHerder) Mockito.mock(StandaloneHerder.class, Mockito.withSettings().useConstructor(new Object[]{this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, new MemoryConfigBackingStore(this.transformer), this.noneConnectorClientConfigOverridePolicy, new MockTime()}).defaultAnswer(Mockito.CALLS_REAL_METHODS));
        this.createCallback = new FutureCallback<>();
        this.plugins = (Plugins) Mockito.mock(Plugins.class);
        this.pluginLoader = (PluginClassLoader) Mockito.mock(PluginClassLoader.class);
        this.loaderSwap = (LoaderSwap) Mockito.mock(LoaderSwap.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        Mockito.when(this.transformer.transform((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) forClass.capture())).thenAnswer(invocationOnMock -> {
            return (Map) forClass.getValue();
        });
    }

    @After
    public void tearDown() {
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.statusBackingStore});
    }

    @Test
    public void testCreateSourceConnector() throws Exception {
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
    }

    @Test
    public void testCreateConnectorFailedValidation() {
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        connectorConfig.remove("name");
        Connector connector = (Connector) Mockito.mock(SourceConnector.class);
        Mockito.when(this.worker.configTransformer()).thenReturn(this.transformer);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        Mockito.when(this.transformer.transform((Map) forClass.capture())).thenAnswer(invocationOnMock -> {
            return (Map) forClass.getValue();
        });
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        Mockito.when(this.plugins.newConnector(ArgumentMatchers.anyString())).thenReturn(connector);
        Mockito.when(this.plugins.connectorLoader(ArgumentMatchers.anyString())).thenReturn(this.pluginLoader);
        Mockito.when(this.plugins.withClassLoader(this.pluginLoader)).thenReturn(this.loaderSwap);
        Mockito.when(connector.config()).thenReturn(new ConfigDef());
        Mockito.when(connector.validate(connectorConfig)).thenReturn(new Config(Collections.singletonList(new ConfigValue("foo.bar"))));
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        if (BadRequestException.class != executionException.getCause().getClass()) {
            throw new AssertionError(executionException.getCause());
        }
        ((LoaderSwap) Mockito.verify(this.loaderSwap)).close();
    }

    @Test
    public void testCreateConnectorAlreadyExists() throws Throwable {
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig, connectorConfig);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, futureCallback);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        if (AlreadyExistsException.class != executionException.getCause().getClass()) {
            throw new AssertionError(executionException.getCause());
        }
        ((LoaderSwap) Mockito.verify(this.loaderSwap, Mockito.times(2))).close();
    }

    @Test
    public void testCreateSinkConnector() throws Exception {
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) Mockito.mock(SinkConnector.class), true, connectorConfig);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
    }

    @Test
    public void testCreateConnectorWithStoppedInitialState() throws Exception {
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        Connector connector = (Connector) Mockito.mock(SinkConnector.class);
        expectConfigValidation(connector, false, connectorConfig);
        Mockito.when(this.plugins.newConnector(ArgumentMatchers.anyString())).thenReturn(connector);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STOPPED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED), (Callback) forClass.capture());
        Mockito.when(Boolean.valueOf(this.worker.isRunning(CONNECTOR_NAME))).thenReturn(true);
        Mockito.when(this.herder.connectorType((Map) ArgumentMatchers.any())).thenReturn(ConnectorType.SINK);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, TargetState.STOPPED, false, this.createCallback);
        Assert.assertEquals(new ConnectorInfo(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        ((LoaderSwap) Mockito.verify(this.loaderSwap)).close();
    }

    @Test
    public void testDestroyConnector() throws Exception {
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        Mockito.when(this.statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList());
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        expectDestroy();
        this.herder.deleteConnectorConfig(CONNECTOR_NAME, futureCallback);
        ((StandaloneHerder) Mockito.verify(this.herder)).onDeletion(CONNECTOR_NAME);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.deleteConnectorConfig(CONNECTOR_NAME, futureCallback2);
        Assert.assertTrue(((ExecutionException) Assert.assertThrows("Should have thrown NotFoundException", ExecutionException.class, () -> {
        })).getCause() instanceof NotFoundException);
    }

    @Test
    public void testRestartConnectorSameTaskConfigs() throws Exception {
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.when(this.worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        Mockito.when(this.worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(this.plugins, connectorConfig, true))).thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONNECTOR_NAME, futureCallback);
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME));
    }

    @Test
    public void testRestartConnectorNewTaskConfigs() throws Exception {
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.when(this.worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        Map<String, String> taskConfig = taskConfig(SourceSink.SOURCE);
        taskConfig.put("k", "v");
        Mockito.when(this.worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(this.plugins, connectorConfig, true))).thenReturn(Collections.singletonList(taskConfig));
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.eq(connectorConfig(SourceSink.SOURCE)), (Map) ArgumentMatchers.eq(taskConfig), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        FutureCallback futureCallback = new FutureCallback();
        expectStop();
        this.herder.restartConnector(CONNECTOR_NAME, futureCallback);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put(new TaskStatus(connectorTaskId, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testRestartConnectorFailureOnStart() throws Exception {
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONNECTOR_NAME);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ConnectException connectException = new ConnectException("Failed to start connector");
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion(connectException, (Object) null);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        this.herder.restartConnector(CONNECTOR_NAME, futureCallback);
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertEquals(connectException, e.getCause());
        }
    }

    @Test
    public void testRestartTask() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTask(connectorTaskId);
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask(connectorTaskId, new ClusterConfigState(-1L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 1), Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(connectorTaskId, taskConfig(SourceSink.SOURCE)), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet(), new HashSet(), this.transformer), connectorConfig, taskConfig(SourceSink.SOURCE), this.herder, TargetState.STARTED))).thenReturn(true);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Herder.Created created = (Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS);
        this.createCallback.get(1000L, TimeUnit.SECONDS);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), created.result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(connectorTaskId, futureCallback);
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testRestartTaskFailureOnStart() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask(connectorTaskId, new ClusterConfigState(-1L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 1), Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet(), new HashSet(), this.transformer), connectorConfig, taskConfig(SourceSink.SOURCE), this.herder, TargetState.STARTED))).thenReturn(false);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.MILLISECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(connectorTaskId, futureCallback);
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitTask(connectorTaskId);
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected restart callback to raise an exception");
        } catch (ExecutionException e) {
            Assert.assertEquals(ConnectException.class, e.getCause().getClass());
        }
    }

    @Test
    public void testRestartConnectorAndTasksUnknownConnector() {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(new RestartRequest("UnknownConnector", false, true), futureCallback);
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof NotFoundException);
    }

    @Test
    public void testRestartConnectorAndTasksNoStatus() throws Exception {
        RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true);
        ((StandaloneHerder) Mockito.doReturn(Optional.empty()).when(this.herder)).buildRestartPlan(restartRequest);
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) Mockito.mock(SinkConnector.class), true, connectorConfig);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        Assert.assertTrue(executionException.getCause() instanceof NotFoundException);
        Assert.assertTrue(executionException.getMessage().contains("Status for connector"));
    }

    @Test
    public void testRestartConnectorAndTasksNoRestarts() throws Exception {
        RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        ConnectorStateInfo connectorStateInfo = (ConnectorStateInfo) Mockito.mock(ConnectorStateInfo.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(false);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartTasks())).thenReturn(false);
        Mockito.when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
        ((StandaloneHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) Mockito.mock(SinkConnector.class), true, connectorConfig);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        Assert.assertEquals(connectorStateInfo, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testRestartConnectorAndTasksOnlyConnector() throws Exception {
        RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        ConnectorStateInfo connectorStateInfo = (ConnectorStateInfo) Mockito.mock(ConnectorStateInfo.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(true);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartTasks())).thenReturn(false);
        Mockito.when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
        ((StandaloneHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) Mockito.mock(SinkConnector.class), true, connectorConfig);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONNECTOR_NAME);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        Assert.assertEquals(connectorStateInfo, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        verifyConnectorStatusRestart();
    }

    @Test
    public void testRestartConnectorAndTasksOnlyTasks() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        ConnectorStateInfo connectorStateInfo = (ConnectorStateInfo) Mockito.mock(ConnectorStateInfo.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(false);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartTasks())).thenReturn(true);
        Mockito.when(Integer.valueOf(restartPlan.restartTaskCount())).thenReturn(1);
        Mockito.when(Integer.valueOf(restartPlan.totalTaskCount())).thenReturn(1);
        Mockito.when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(connectorTaskId));
        Mockito.when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
        ((StandaloneHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) Mockito.mock(SinkConnector.class), true, connectorConfig);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks(Collections.singletonList(connectorTaskId));
        Mockito.when(Boolean.valueOf(this.worker.startSinkTask(connectorTaskId, new ClusterConfigState(-1L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 1), Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(connectorTaskId, taskConfig(SourceSink.SINK)), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet(), new HashSet(), this.transformer), connectorConfig, taskConfig(SourceSink.SINK), this.herder, TargetState.STARTED))).thenReturn(true);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        Assert.assertEquals(connectorStateInfo, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TaskStatus.class);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put((TaskStatus) forClass.capture());
        Assert.assertEquals(AbstractStatus.State.RESTARTING, ((TaskStatus) forClass.getValue()).state());
        Assert.assertEquals(connectorTaskId, ((TaskStatus) forClass.getValue()).id());
    }

    @Test
    public void testRestartConnectorAndTasksBoth() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        ConnectorStateInfo connectorStateInfo = (ConnectorStateInfo) Mockito.mock(ConnectorStateInfo.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(true);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartTasks())).thenReturn(true);
        Mockito.when(Integer.valueOf(restartPlan.restartTaskCount())).thenReturn(1);
        Mockito.when(Integer.valueOf(restartPlan.totalTaskCount())).thenReturn(1);
        Mockito.when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(connectorTaskId));
        Mockito.when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
        ((StandaloneHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TaskStatus.class);
        this.connector = (Connector) Mockito.mock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) Mockito.mock(SinkConnector.class), true, connectorConfig);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONNECTOR_NAME);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks(Collections.singletonList(connectorTaskId));
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass2.capture());
        Mockito.when(Boolean.valueOf(this.worker.startSinkTask(connectorTaskId, new ClusterConfigState(-1L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 1), Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(connectorTaskId, taskConfig(SourceSink.SINK)), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet(), new HashSet(), this.transformer), connectorConfig, taskConfig(SourceSink.SINK), this.herder, TargetState.STARTED))).thenReturn(true);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SINK), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        Assert.assertEquals(connectorStateInfo, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        verifyConnectorStatusRestart();
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put((TaskStatus) forClass.capture());
        Assert.assertEquals(AbstractStatus.State.RESTARTING, ((TaskStatus) forClass.getValue()).state());
        Assert.assertEquals(connectorTaskId, ((TaskStatus) forClass.getValue()).id());
    }

    @Test
    public void testCreateAndStop() throws Exception {
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        expectStop();
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        this.herder.stop();
        Assert.assertTrue(this.noneConnectorClientConfigOverridePolicy.isClosed());
        ((Worker) Mockito.verify(this.worker)).stop();
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).stop();
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
    }

    @Test
    public void testAccessors() throws Exception {
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        System.out.println(connectorConfig);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        Callback callback2 = (Callback) Mockito.mock(Callback.class);
        Callback callback3 = (Callback) Mockito.mock(Callback.class);
        Callback callback4 = (Callback) Mockito.mock(Callback.class);
        Callback callback5 = (Callback) Mockito.mock(Callback.class);
        ((Callback) Mockito.doNothing().when(callback)).onCompletion((Throwable) null, Collections.EMPTY_SET);
        ((Callback) Mockito.doNothing().when(callback2)).onCompletion((Throwable) ArgumentMatchers.any(NotFoundException.class), Mockito.isNull());
        ((Callback) Mockito.doNothing().when(callback4)).onCompletion((Throwable) ArgumentMatchers.any(NotFoundException.class), Mockito.isNull());
        ((Callback) Mockito.doNothing().when(callback5)).onCompletion((Throwable) ArgumentMatchers.any(NotFoundException.class), Mockito.isNull());
        ((Callback) Mockito.doNothing().when(callback3)).onCompletion((Throwable) ArgumentMatchers.any(NotFoundException.class), Mockito.isNull());
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        expectConfigValidation(this.connector, true, connectorConfig);
        ((Callback) Mockito.doNothing().when(callback)).onCompletion((Throwable) null, Collections.singleton(CONNECTOR_NAME));
        ((Callback) Mockito.doNothing().when(callback2)).onCompletion((Throwable) null, new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE));
        ((Callback) Mockito.doNothing().when(callback4)).onCompletion((Throwable) null, Arrays.asList(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE))));
        ((Callback) Mockito.doNothing().when(callback5)).onCompletion((Throwable) null, Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)));
        this.herder.connectors(callback);
        this.herder.connectorInfo(CONNECTOR_NAME, callback2);
        this.herder.connectorConfig(CONNECTOR_NAME, callback3);
        this.herder.taskConfigs(CONNECTOR_NAME, callback4);
        this.herder.tasksConfig(CONNECTOR_NAME, callback5);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        Mockito.reset(new WorkerConfigTransformer[]{this.transformer});
        this.herder.connectors(callback);
        this.herder.connectorInfo(CONNECTOR_NAME, callback2);
        this.herder.connectorConfig(CONNECTOR_NAME, callback3);
        this.herder.taskConfigs(CONNECTOR_NAME, callback4);
        this.herder.tasksConfig(CONNECTOR_NAME, callback5);
        ((WorkerConfigTransformer) Mockito.verify(this.transformer, Mockito.never())).transform((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.any());
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        HashMap hashMap = new HashMap(connectorConfig);
        hashMap.put("foo", "bar");
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig, hashMap);
        ((Callback) Mockito.doNothing().when(callback)).onCompletion((Throwable) null, connectorConfig);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONNECTOR_NAME);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) forClass.capture(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass2.capture());
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        Mockito.when(this.worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(this.plugins, hashMap, true))).thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks(Collections.singletonList(connectorTaskId));
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).put(new TaskStatus(connectorTaskId, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(connectorTaskId), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.eq(hashMap), (Map) ArgumentMatchers.eq(taskConfig(SourceSink.SOURCE)), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        this.herder.connectorConfig(CONNECTOR_NAME, callback);
        FutureCallback futureCallback = new FutureCallback();
        ((Callback) Mockito.doNothing().when(callback)).onCompletion((Throwable) null, hashMap);
        this.herder.putConnectorConfig(CONNECTOR_NAME, hashMap, true, futureCallback);
        Assert.assertEquals(new ConnectorInfo(CONNECTOR_NAME, hashMap, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE), ((Herder.Created) futureCallback.get(1000L, TimeUnit.SECONDS)).result());
        Assert.assertEquals("bar", ((Map) forClass.getValue()).get("foo"));
        this.herder.connectorConfig(CONNECTOR_NAME, callback);
        Mockito.verifyNoMoreInteractions(new Object[]{callback});
    }

    @Test
    public void testPutTaskConfigs() {
        Callback callback = (Callback) Mockito.mock(Callback.class);
        Assert.assertThrows(UnsupportedOperationException.class, () -> {
            this.herder.putTaskConfigs(CONNECTOR_NAME, Collections.singletonList(Collections.singletonMap("config", "value")), callback, (InternalRequestSignature) null);
        });
    }

    @Test
    public void testCorruptConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", BogusSinkConnector.class.getName());
        hashMap.put("topics", TOPICS_LIST_STR);
        Connector connector = (Connector) Mockito.mock(SinkConnector.class);
        Mockito.when(connector.validate(hashMap)).thenReturn(new Config(Arrays.asList(new ConfigValue("foo.invalid.key", (Object) null, Collections.emptyList(), new ArrayList(Collections.singletonList("This is an error in your config!"))))));
        ConfigDef configDef = new ConfigDef();
        configDef.define("foo.invalid.key", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
        Mockito.when(this.worker.configTransformer()).thenReturn(this.transformer);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        Mockito.when(this.transformer.transform((Map) forClass.capture())).thenAnswer(invocationOnMock -> {
            return (Map) forClass.getValue();
        });
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        Mockito.when(this.plugins.connectorLoader(ArgumentMatchers.anyString())).thenReturn(this.pluginLoader);
        Mockito.when(this.plugins.withClassLoader(this.pluginLoader)).thenReturn(this.loaderSwap);
        Mockito.when(this.plugins.newConnector(ArgumentMatchers.anyString())).thenReturn(connector);
        Mockito.when(connector.config()).thenReturn(configDef);
        this.herder.putConnectorConfig(CONNECTOR_NAME, hashMap, true, this.createCallback);
        ExecutionException executionException = (ExecutionException) Assert.assertThrows("Should have failed to configure connector", ExecutionException.class, () -> {
        });
        Assert.assertNotNull(executionException.getCause());
        Throwable cause = executionException.getCause();
        Assert.assertTrue(cause instanceof BadRequestException);
        Assert.assertEquals(cause.getMessage(), "Connector configuration is invalid and contains the following 1 error(s):\nThis is an error in your config!\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`");
        ((LoaderSwap) Mockito.verify(this.loaderSwap)).close();
    }

    @Test
    public void testTargetStates() throws Exception {
        this.connector = (Connector) Mockito.mock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) Mockito.mock(SourceConnector.class), true, connectorConfig);
        expectTargetState(CONNECTOR_NAME, TargetState.PAUSED);
        expectTargetState(CONNECTOR_NAME, TargetState.STOPPED);
        expectStop();
        FutureCallback futureCallback = new FutureCallback();
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        Assert.assertEquals(createdInfo(SourceSink.SOURCE), ((Herder.Created) this.createCallback.get(1000L, TimeUnit.SECONDS)).result());
        this.herder.pauseConnector(CONNECTOR_NAME);
        this.herder.stopConnector(CONNECTOR_NAME, futureCallback);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        futureCallback.get(10L, TimeUnit.SECONDS);
        this.herder.taskConfigs(CONNECTOR_NAME, futureCallback2);
        Assert.assertEquals(Collections.emptyList(), futureCallback2.get(1L, TimeUnit.SECONDS));
        this.herder.stop();
        Assert.assertTrue(this.noneConnectorClientConfigOverridePolicy.isClosed());
        ((Worker) Mockito.verify(this.worker)).stop();
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).stop();
    }

    @Test
    public void testModifyConnectorOffsetsUnknownConnector() {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.alterConnectorOffsets("unknown-connector", Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), futureCallback);
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof NotFoundException);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.resetConnectorOffsets("unknown-connector", futureCallback2);
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof NotFoundException);
    }

    @Test
    public void testModifyConnectorOffsetsConnectorNotInStoppedState() {
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        this.herder.configState = new ClusterConfigState(10L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 3), Collections.singletonMap(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE)), Collections.singletonMap(CONNECTOR_NAME, TargetState.PAUSED), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), Collections.emptySet(), Collections.emptySet());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.alterConnectorOffsets(CONNECTOR_NAME, Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), futureCallback);
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof BadRequestException);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.resetConnectorOffsets(CONNECTOR_NAME, futureCallback2);
        Assert.assertTrue(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause() instanceof BadRequestException);
    }

    @Test
    public void testAlterConnectorOffsets() throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        Message message = new Message("The offsets for this connector have been altered successfully");
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, message);
            return null;
        }).when(this.worker)).modifyConnectorOffsets((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig(SourceSink.SOURCE)), (Map) ArgumentMatchers.any(Map.class), (Callback) forClass.capture());
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        this.herder.configState = new ClusterConfigState(10L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 0), Collections.singletonMap(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE)), Collections.singletonMap(CONNECTOR_NAME, TargetState.STOPPED), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), Collections.emptySet(), Collections.emptySet());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.alterConnectorOffsets(CONNECTOR_NAME, Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), futureCallback);
        Assert.assertEquals(message, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testResetConnectorOffsets() throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        Message message = new Message("The offsets for this connector have been reset successfully");
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, message);
            return null;
        }).when(this.worker)).modifyConnectorOffsets((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig(SourceSink.SOURCE)), (Map) Mockito.isNull(), (Callback) forClass.capture());
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        this.herder.configState = new ClusterConfigState(10L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 0), Collections.singletonMap(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE)), Collections.singletonMap(CONNECTOR_NAME, TargetState.STOPPED), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), Collections.emptySet(), Collections.emptySet());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.resetConnectorOffsets(CONNECTOR_NAME, futureCallback);
        Assert.assertEquals(message, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
    }

    private void expectAdd(SourceSink sourceSink) {
        Map<String, String> connectorConfig = connectorConfig(sourceSink);
        SourceConnectorConfig sourceConnectorConfig = sourceSink == SourceSink.SOURCE ? new SourceConnectorConfig(this.plugins, connectorConfig, true) : new SinkConnectorConfig(this.plugins, connectorConfig);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONNECTOR_NAME), (Map) ArgumentMatchers.eq(connectorConfig), (CloseableConnectorContext) ArgumentMatchers.any(HerderConnectorContext.class), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.when(Boolean.valueOf(this.worker.isRunning(CONNECTOR_NAME))).thenReturn(true);
        if (sourceSink == SourceSink.SOURCE) {
            Mockito.when(Boolean.valueOf(this.worker.isTopicCreationEnabled())).thenReturn(true);
        }
        Map<String, String> connectorConfig2 = connectorConfig(sourceSink);
        Map<String, String> taskConfig = taskConfig(sourceSink);
        Mockito.when(this.worker.connectorTaskConfigs(CONNECTOR_NAME, sourceConnectorConfig)).thenReturn(Collections.singletonList(taskConfig));
        ClusterConfigState clusterConfigState = new ClusterConfigState(-1L, (SessionKey) null, Collections.singletonMap(CONNECTOR_NAME, 1), Collections.singletonMap(CONNECTOR_NAME, connectorConfig2), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig2)), new HashSet(), new HashSet(), this.transformer);
        if (sourceSink.equals(SourceSink.SOURCE)) {
            Mockito.when(Boolean.valueOf(this.worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 0), clusterConfigState, connectorConfig(sourceSink), taskConfig, this.herder, TargetState.STARTED))).thenReturn(true);
        } else {
            Mockito.when(Boolean.valueOf(this.worker.startSinkTask(new ConnectorTaskId(CONNECTOR_NAME, 0), clusterConfigState, connectorConfig(sourceSink), taskConfig, this.herder, TargetState.STARTED))).thenReturn(true);
        }
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Map.class);
        Mockito.when(this.herder.connectorType((Map) forClass2.capture())).thenAnswer(invocationOnMock2 -> {
            String str = (String) ((Map) forClass2.getValue()).get("connector.class");
            return BogusSourceConnector.class.getName().equals(str) ? ConnectorType.SOURCE : BogusSinkConnector.class.getName().equals(str) ? ConnectorType.SINK : ConnectorType.UNKNOWN;
        });
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONNECTOR_NAME))).thenReturn(Boolean.valueOf(sourceSink == SourceSink.SINK));
    }

    private void expectTargetState(String str, TargetState targetState) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, targetState);
            return null;
        }).when(this.worker)).setTargetState((String) ArgumentMatchers.eq(str), (TargetState) ArgumentMatchers.eq(targetState), (Callback) forClass.capture());
    }

    private ConnectorInfo createdInfo(SourceSink sourceSink) {
        return new ConnectorInfo(CONNECTOR_NAME, connectorConfig(sourceSink), Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : ConnectorType.SINK);
    }

    private void expectStop() {
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks(Collections.singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONNECTOR_NAME);
    }

    private void expectDestroy() {
        expectStop();
    }

    private static Map<String, String> connectorConfig(SourceSink sourceSink) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", (sourceSink == SourceSink.SINK ? BogusSinkConnector.class : BogusSourceConnector.class).getName());
        hashMap.put("tasks.max", "1");
        if (sourceSink == SourceSink.SINK) {
            hashMap.put("topics", TOPICS_LIST_STR);
        } else {
            hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
            hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        }
        return hashMap;
    }

    private static Map<String, String> taskConfig(SourceSink sourceSink) {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", "bar");
        hashMap.put("task.class", (sourceSink == SourceSink.SINK ? BogusSinkTask.class : BogusSourceTask.class).getName());
        if (sourceSink == SourceSink.SINK) {
            hashMap.put("topics", TOPICS_LIST_STR);
        }
        return hashMap;
    }

    private void expectConfigValidation(Connector connector, boolean z, Map<String, String>... mapArr) {
        Mockito.when(this.worker.configTransformer()).thenReturn(this.transformer);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        Mockito.when(this.transformer.transform((Map) forClass.capture())).thenAnswer(invocationOnMock -> {
            return (Map) forClass.getValue();
        });
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        Mockito.when(this.plugins.connectorLoader(ArgumentMatchers.anyString())).thenReturn(this.pluginLoader);
        Mockito.when(this.plugins.withClassLoader(this.pluginLoader)).thenReturn(this.loaderSwap);
        if (z) {
            Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
            Mockito.when(this.plugins.newConnector(ArgumentMatchers.anyString())).thenReturn(connector);
        }
        Mockito.when(connector.config()).thenReturn(new ConfigDef());
        for (Map<String, String> map : mapArr) {
            Mockito.when(connector.validate(map)).thenReturn(new Config(Collections.emptyList()));
        }
    }

    private void verifyConnectorStatusRestart() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConnectorStatus.class);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).put((ConnectorStatus) forClass.capture());
        Assert.assertEquals(CONNECTOR_NAME, ((ConnectorStatus) forClass.getValue()).id());
        Assert.assertEquals(AbstractStatus.State.RESTARTING, ((ConnectorStatus) forClass.getValue()).state());
    }
}
