package org.apache.kafka.connect.runtime.standalone;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerConnector;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({StandaloneHerder.class, Plugins.class, WorkerConnector.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.class */
public class StandaloneHerderTest {
    private static final String CONNECTOR_NAME = "test";
    private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
    private static final String TOPICS_LIST_STR = "topic1,topic2";
    private static final int DEFAULT_MAX_TASKS = 1;
    private static final String WORKER_ID = "localhost:8083";
    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
    private StandaloneHerder herder;
    private Connector connector;

    @Mock
    protected Worker worker;

    @Mock
    protected WorkerConfigTransformer transformer;

    @Mock
    private Plugins plugins;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private DelegatingClassLoader delegatingLoader;

    @Mock
    protected Callback<Herder.Created<ConnectorInfo>> createCallback;

    @Mock
    protected StatusBackingStore statusBackingStore;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSinkConnector.class */
    private abstract class BogusSinkConnector extends SinkConnector {
        private BogusSinkConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSinkTask.class */
    private abstract class BogusSinkTask extends SourceTask {
        private BogusSinkTask() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSourceConnector.class */
    private abstract class BogusSourceConnector extends SourceConnector {
        private BogusSourceConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$BogusSourceTask.class */
    private abstract class BogusSourceTask extends SourceTask {
        private BogusSourceTask() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest$SourceSink.class */
    public enum SourceSink {
        SOURCE,
        SINK
    }

    @Before
    public void setup() {
        this.worker = (Worker) PowerMock.createMock(Worker.class);
        this.herder = (StandaloneHerder) PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"}, new Object[]{this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, new MemoryConfigBackingStore(this.transformer)});
        this.plugins = (Plugins) PowerMock.createMock(Plugins.class);
        this.pluginLoader = (PluginClassLoader) PowerMock.createMock(PluginClassLoader.class);
        this.delegatingLoader = (DelegatingClassLoader) PowerMock.createMock(DelegatingClassLoader.class);
        PowerMock.mockStatic(Plugins.class);
        PowerMock.mockStatic(WorkerConnector.class);
        Capture newInstance = Capture.newInstance();
        IExpectationSetters expect = EasyMock.expect(this.transformer.transform((String) EasyMock.eq(CONNECTOR_NAME), (Map) EasyMock.capture(newInstance)));
        newInstance.getClass();
        expect.andAnswer(newInstance::getValue).anyTimes();
    }

    @Test
    public void testCreateSourceConnector() throws Exception {
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorFailedBasicValidation() {
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        connectorConfig.remove("name");
        Connector connector = (Connector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(this.worker.configTransformer()).andReturn(this.transformer).times(2);
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformer.transform((Map) EasyMock.capture(newCapture)));
        newCapture.getClass();
        expect.andAnswer(newCapture::getValue);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        EasyMock.expect(connector.config()).andStubReturn(new ConfigDef());
        EasyMock.expect(connector.validate(connectorConfig)).andReturn(new Config(Collections.singletonList(new ConfigValue("foo.bar"))));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.createCallback.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorFailedCustomValidation() {
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        Connector connector = (Connector) PowerMock.createMock(SourceConnector.class);
        EasyMock.expect(this.worker.configTransformer()).andReturn(this.transformer).times(2);
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformer.transform((Map) EasyMock.capture(newCapture)));
        newCapture.getClass();
        expect.andAnswer(newCapture::getValue);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        ConfigDef configDef = new ConfigDef();
        configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
        EasyMock.expect(connector.config()).andReturn(configDef);
        ConfigValue configValue = new ConfigValue("foo.bar");
        configValue.addErrorMessage("Failed foo.bar validation");
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        EasyMock.expect(connector.validate(connectorConfig)).andReturn(new Config(Collections.singletonList(configValue)));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.createCallback.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorAlreadyExists() throws Exception {
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        Connector connector = (Connector) PowerMock.createMock(SourceConnector.class);
        expectConfigValidation(connector, true, connectorConfig, connectorConfig);
        EasyMock.expect(this.worker.configTransformer()).andReturn(this.transformer).times(2);
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformer.transform((Map) EasyMock.capture(newCapture)));
        newCapture.getClass();
        expect.andAnswer(newCapture::getValue);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(2);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.createCallback.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateSinkConnector() throws Exception {
        this.connector = (Connector) PowerMock.createMock(BogusSinkConnector.class);
        expectAdd(SourceSink.SINK);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
        expectConfigValidation((Connector) PowerMock.createMock(SinkConnector.class), true, connectorConfig);
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        PowerMock.verifyAll();
    }

    @Test
    public void testDestroyConnector() throws Exception {
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        EasyMock.expect(this.statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList());
        this.statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
        expectDestroy();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.deleteConnectorConfig(CONNECTOR_NAME, futureCallback);
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.deleteConnectorConfig(CONNECTOR_NAME, futureCallback2);
        try {
            futureCallback2.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Should have thrown NotFoundException");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NotFoundException);
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnector() throws Exception {
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        this.worker.stopConnector(CONNECTOR_NAME);
        EasyMock.expectLastCall().andReturn(true);
        this.worker.startConnector((String) EasyMock.eq(CONNECTOR_NAME), (Map) EasyMock.eq(connectorConfig), (ConnectorContext) EasyMock.anyObject(HerderConnectorContext.class), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        EasyMock.expectLastCall().andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONNECTOR_NAME, futureCallback);
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartConnectorFailureOnStart() throws Exception {
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        this.worker.stopConnector(CONNECTOR_NAME);
        EasyMock.expectLastCall().andReturn(true);
        this.worker.startConnector((String) EasyMock.eq(CONNECTOR_NAME), (Map) EasyMock.eq(connectorConfig), (ConnectorContext) EasyMock.anyObject(HerderConnectorContext.class), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        EasyMock.expectLastCall().andReturn(false);
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONNECTOR_NAME, futureCallback);
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertEquals(ConnectException.class, e.getCause().getClass());
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartTask() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        this.worker.stopAndAwaitTask(connectorTaskId);
        EasyMock.expectLastCall();
        this.worker.startTask(connectorTaskId, new ClusterConfigState(-1L, Collections.singletonMap(CONNECTOR_NAME, Integer.valueOf(DEFAULT_MAX_TASKS)), Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(connectorTaskId, taskConfig(SourceSink.SOURCE)), new HashSet(), this.transformer), connectorConfig, taskConfig(SourceSink.SOURCE), this.herder, TargetState.STARTED);
        EasyMock.expectLastCall().andReturn(true);
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(connectorTaskId, futureCallback);
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        PowerMock.verifyAll();
    }

    @Test
    public void testRestartTaskFailureOnStart() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        this.worker.stopAndAwaitTask(connectorTaskId);
        EasyMock.expectLastCall();
        this.worker.startTask(connectorTaskId, new ClusterConfigState(-1L, Collections.singletonMap(CONNECTOR_NAME, Integer.valueOf(DEFAULT_MAX_TASKS)), Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)), new HashSet(), this.transformer), connectorConfig, taskConfig(SourceSink.SOURCE), this.herder, TargetState.STARTED);
        EasyMock.expectLastCall().andReturn(false);
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(connectorTaskId, futureCallback);
        try {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected restart callback to raise an exception");
        } catch (ExecutionException e) {
            Assert.assertEquals(ConnectException.class, e.getCause().getClass());
        }
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateAndStop() throws Exception {
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        expectConfigValidation((Connector) PowerMock.createMock(SourceConnector.class), true, connectorConfig);
        expectStop();
        this.statusBackingStore.stop();
        EasyMock.expectLastCall();
        this.worker.stop();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        this.herder.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testAccessors() throws Exception {
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        System.out.println(connectorConfig);
        Callback callback = (Callback) PowerMock.createMock(Callback.class);
        Callback callback2 = (Callback) PowerMock.createMock(Callback.class);
        Callback callback3 = (Callback) PowerMock.createMock(Callback.class);
        Callback callback4 = (Callback) PowerMock.createMock(Callback.class);
        callback.onCompletion((Throwable) null, Collections.EMPTY_SET);
        EasyMock.expectLastCall();
        callback2.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        EasyMock.expectLastCall();
        callback3.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        EasyMock.expectLastCall();
        callback4.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        EasyMock.expectLastCall();
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        expectConfigValidation(this.connector, true, connectorConfig);
        callback.onCompletion((Throwable) null, Collections.singleton(CONNECTOR_NAME));
        EasyMock.expectLastCall();
        callback2.onCompletion((Throwable) null, new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE));
        EasyMock.expectLastCall();
        callback3.onCompletion((Throwable) null, connectorConfig);
        EasyMock.expectLastCall();
        callback4.onCompletion((Throwable) null, Arrays.asList(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE))));
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.connectors(callback);
        this.herder.connectorInfo(CONNECTOR_NAME, callback2);
        this.herder.connectorConfig(CONNECTOR_NAME, callback3);
        this.herder.taskConfigs(CONNECTOR_NAME, callback4);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        EasyMock.reset(new Object[]{this.transformer});
        EasyMock.expect(this.transformer.transform((String) EasyMock.eq(CONNECTOR_NAME), (Map) EasyMock.anyObject())).andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")).anyTimes();
        EasyMock.replay(new Object[]{this.transformer});
        this.herder.connectors(callback);
        this.herder.connectorInfo(CONNECTOR_NAME, callback2);
        this.herder.connectorConfig(CONNECTOR_NAME, callback3);
        this.herder.taskConfigs(CONNECTOR_NAME, callback4);
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
        HashMap hashMap = new HashMap(connectorConfig);
        hashMap.put("foo", "bar");
        Callback callback = (Callback) PowerMock.createMock(Callback.class);
        Callback callback2 = (Callback) PowerMock.createMock(Callback.class);
        this.connector = (Connector) PowerMock.createMock(BogusSourceConnector.class);
        expectAdd(SourceSink.SOURCE);
        Connector connector = (Connector) PowerMock.createMock(SourceConnector.class);
        expectConfigValidation(connector, true, connectorConfig);
        callback.onCompletion((Throwable) null, connectorConfig);
        EasyMock.expectLastCall();
        this.worker.stopConnector(CONNECTOR_NAME);
        EasyMock.expectLastCall().andReturn(true);
        Capture newCapture = EasyMock.newCapture();
        this.worker.startConnector((String) EasyMock.eq(CONNECTOR_NAME), (Map) EasyMock.capture(newCapture), (ConnectorContext) EasyMock.anyObject(), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        EasyMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONNECTOR_NAME))).andReturn(true);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(this.plugins, hashMap))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
        this.worker.isSinkConnector(CONNECTOR_NAME);
        EasyMock.expectLastCall().andReturn(false);
        callback2.onCompletion((Throwable) null, new Herder.Created(false, new ConnectorInfo(CONNECTOR_NAME, hashMap, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE)));
        EasyMock.expectLastCall();
        expectConfigValidation(connector, false, hashMap);
        callback.onCompletion((Throwable) null, hashMap);
        EasyMock.expectLastCall();
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).anyTimes();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, this.createCallback);
        this.herder.connectorConfig(CONNECTOR_NAME, callback);
        this.herder.putConnectorConfig(CONNECTOR_NAME, hashMap, true, callback2);
        Assert.assertEquals("bar", ((Map) newCapture.getValue()).get("foo"));
        this.herder.connectorConfig(CONNECTOR_NAME, callback);
        PowerMock.verifyAll();
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testPutTaskConfigs() {
        Callback callback = (Callback) PowerMock.createMock(Callback.class);
        PowerMock.replayAll(new Object[0]);
        this.herder.putTaskConfigs(CONNECTOR_NAME, Arrays.asList(Collections.singletonMap("config", "value")), callback);
        PowerMock.verifyAll();
    }

    @Test
    public void testCorruptConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", BogusSinkConnector.class.getName());
        hashMap.put("topics", TOPICS_LIST_STR);
        Connector connector = (Connector) PowerMock.createMock(SinkConnector.class);
        EasyMock.expect(connector.validate(hashMap)).andReturn(new Config(Arrays.asList(new ConfigValue("foo.invalid.key", (Object) null, Collections.emptyList(), new ArrayList(Collections.singletonList("This is an error in your config!"))))));
        ConfigDef configDef = new ConfigDef();
        configDef.define("foo.invalid.key", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
        EasyMock.expect(this.worker.configTransformer()).andReturn(this.transformer).times(2);
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformer.transform((Map) EasyMock.capture(newCapture)));
        newCapture.getClass();
        expect.andAnswer(newCapture::getValue);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        EasyMock.expect(this.worker.getPlugins()).andStubReturn(this.plugins);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        EasyMock.expect(connector.config()).andStubReturn(configDef);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        Callback callback = (Callback) PowerMock.createMock(Callback.class);
        Capture newInstance = Capture.newInstance();
        callback.onCompletion((Throwable) EasyMock.capture(newInstance), EasyMock.isNull(Herder.Created.class));
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONNECTOR_NAME, hashMap, true, callback);
        Assert.assertEquals(((BadRequestException) newInstance.getValue()).getMessage(), "Connector configuration is invalid and contains the following 1 error(s):\nThis is an error in your config!\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`");
        PowerMock.verifyAll();
    }

    private void expectAdd(SourceSink sourceSink) {
        Map<String, String> connectorConfig = connectorConfig(sourceSink);
        SourceConnectorConfig sourceConnectorConfig = sourceSink == SourceSink.SOURCE ? new SourceConnectorConfig(this.plugins, connectorConfig) : new SinkConnectorConfig(this.plugins, connectorConfig);
        this.worker.startConnector((String) EasyMock.eq(CONNECTOR_NAME), (Map) EasyMock.eq(connectorConfig), (ConnectorContext) EasyMock.anyObject(HerderConnectorContext.class), (ConnectorStatus.Listener) EasyMock.eq(this.herder), (TargetState) EasyMock.eq(TargetState.STARTED));
        EasyMock.expectLastCall().andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.worker.isRunning(CONNECTOR_NAME))).andReturn(true);
        this.createCallback.onCompletion((Throwable) null, new Herder.Created(true, new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : ConnectorType.SINK)));
        EasyMock.expectLastCall();
        Map<String, String> taskConfig = taskConfig(sourceSink);
        EasyMock.expect(this.worker.connectorTaskConfigs(CONNECTOR_NAME, sourceConnectorConfig)).andReturn(Collections.singletonList(taskConfig));
        this.worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new ClusterConfigState(-1L, Collections.singletonMap(CONNECTOR_NAME, Integer.valueOf(DEFAULT_MAX_TASKS)), Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig), new HashSet(), this.transformer), connectorConfig(sourceSink), taskConfig, this.herder, TargetState.STARTED);
        EasyMock.expectLastCall().andReturn(true);
        EasyMock.expect(this.herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes();
        EasyMock.expect(this.herder.connectorTypeForClass(BogusSinkConnector.class.getName())).andReturn(ConnectorType.SINK).anyTimes();
        this.worker.isSinkConnector(CONNECTOR_NAME);
        PowerMock.expectLastCall().andReturn(Boolean.valueOf(sourceSink == SourceSink.SINK));
    }

    private void expectStop() {
        this.worker.stopAndAwaitTasks(Collections.singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
        EasyMock.expectLastCall();
        this.worker.stopConnector(CONNECTOR_NAME);
        EasyMock.expectLastCall().andReturn(true);
    }

    private void expectDestroy() {
        expectStop();
    }

    private static Map<String, String> connectorConfig(SourceSink sourceSink) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", (sourceSink == SourceSink.SINK ? BogusSinkConnector.class : BogusSourceConnector.class).getName());
        hashMap.put("tasks.max", "1");
        if (sourceSink == SourceSink.SINK) {
            hashMap.put("topics", TOPICS_LIST_STR);
        }
        return hashMap;
    }

    private static Map<String, String> taskConfig(SourceSink sourceSink) {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", "bar");
        hashMap.put("task.class", (sourceSink == SourceSink.SINK ? BogusSinkTask.class : BogusSourceTask.class).getName());
        if (sourceSink == SourceSink.SINK) {
            hashMap.put("topics", TOPICS_LIST_STR);
        }
        return hashMap;
    }

    private void expectConfigValidation(Connector connector, boolean z, Map<String, String>... mapArr) {
        EasyMock.expect(this.worker.configTransformer()).andReturn(this.transformer).times(2);
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformer.transform((Map) EasyMock.capture(newCapture)));
        newCapture.getClass();
        expect.andAnswer(newCapture::getValue);
        EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins).times(3);
        EasyMock.expect(this.plugins.compareAndSwapLoaders(connector)).andReturn(this.delegatingLoader);
        if (z) {
            EasyMock.expect(this.worker.getPlugins()).andReturn(this.plugins);
            EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andReturn(connector);
        }
        EasyMock.expect(connector.config()).andStubReturn(new ConfigDef());
        int length = mapArr.length;
        for (int i = 0; i < length; i += DEFAULT_MAX_TASKS) {
            EasyMock.expect(connector.validate(mapArr[i])).andReturn(new Config(Collections.emptyList()));
        }
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
    }
}
