package org.apache.kafka.connect.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockNice;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({Worker.class, Plugins.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest.class */
public class WorkerTest extends ThreadedTest {
    private static final String CONNECTOR_ID = "test-connector";
    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
    private static final String WORKER_ID = "localhost:8083";
    private WorkerConfig config;
    private Worker worker;

    @Mock
    private Plugins plugins;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private DelegatingClassLoader delegatingLoader;

    @Mock
    private OffsetBackingStore offsetBackingStore;

    @MockStrict
    private TaskStatus.Listener taskStatusListener;

    @MockStrict
    private ConnectorStatus.Listener connectorStatusListener;

    @Mock
    private Herder herder;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private Connector connector;

    @Mock
    private ConnectorContext ctx;

    @Mock
    private TestSourceTask task;

    @Mock
    private WorkerSourceTask workerTask;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private Converter taskKeyConverter;

    @Mock
    private Converter taskValueConverter;

    @Mock
    private HeaderConverter taskHeaderConverter;

    @Mock
    private ExecutorService executorService;

    @MockNice
    private ConnectorConfig connectorConfig;
    private String mockFileProviderTestId;
    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
    private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
    private Map<String, String> workerProps = new HashMap();
    private Map<String, String> defaultProducerConfigs = new HashMap();
    private Map<String, String> defaultConsumerConfigs = new HashMap();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestConfigurableConverter.class */
    public static class TestConfigurableConverter implements Converter, Configurable {
        public Map<String, ?> configs;

        public ConfigDef config() {
            return JsonConverterConfig.configDef();
        }

        public void configure(Map<String, ?> map) {
            this.configs = map;
            new JsonConverterConfig(map);
        }

        public void configure(Map<String, ?> map, boolean z) {
            this.configs = map;
        }

        public byte[] fromConnectData(String str, Schema schema, Object obj) {
            return new byte[0];
        }

        public SchemaAndValue toConnectData(String str, byte[] bArr) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestConverter.class */
    public static class TestConverter implements Converter {
        public Map<String, ?> configs;

        public void configure(Map<String, ?> map, boolean z) {
            this.configs = map;
        }

        public byte[] fromConnectData(String str, Schema schema, Object obj) {
            return new byte[0];
        }

        public SchemaAndValue toConnectData(String str, byte[] bArr) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestSourceTask.class */
    private static class TestSourceTask extends SourceTask {
        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
        }

        public List<SourceRecord> poll() throws InterruptedException {
            return null;
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$WorkerTestConnector.class */
    public static class WorkerTestConnector extends Connector {
        private static final ConfigDef CONFIG_DEF = new ConfigDef().define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");

        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
        }

        public Class<? extends Task> taskClass() {
            return null;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return null;
        }

        public void stop() {
        }

        public ConfigDef config() {
            return CONFIG_DEF;
        }
    }

    @Override // org.apache.kafka.connect.util.ThreadedTest
    @Before
    public void setup() {
        super.setup();
        this.workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("internal.key.converter.schemas.enable", "false");
        this.workerProps.put("internal.value.converter.schemas.enable", "false");
        this.workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
        this.workerProps.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        this.workerProps.put("config.providers", "file");
        this.workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
        this.mockFileProviderTestId = UUID.randomUUID().toString();
        this.workerProps.put("config.providers.file.param.testId", this.mockFileProviderTestId);
        this.config = new StandaloneConfig(this.workerProps);
        this.defaultProducerConfigs.put("bootstrap.servers", "localhost:9092");
        this.defaultProducerConfigs.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.defaultProducerConfigs.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.defaultProducerConfigs.put("request.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        this.defaultProducerConfigs.put("max.block.ms", Long.toString(Long.MAX_VALUE));
        this.defaultProducerConfigs.put("acks", "all");
        this.defaultProducerConfigs.put("max.in.flight.requests.per.connection", "1");
        this.defaultProducerConfigs.put("delivery.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        this.defaultConsumerConfigs.put("bootstrap.servers", "localhost:9092");
        this.defaultConsumerConfigs.put("enable.auto.commit", "false");
        this.defaultConsumerConfigs.put("auto.offset.reset", "earliest");
        this.defaultConsumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.defaultConsumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        PowerMock.mockStatic(Plugins.class);
    }

    @Test
    public void testStartAndStopConnector() {
        expectConverters();
        expectStartStorage();
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(this.plugins.newConnector(WorkerTestConnector.class.getName())).andReturn(this.connector);
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", WorkerTestConnector.class.getName());
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        expectFileConfigProvider();
        EasyMock.expect(this.plugins.compareAndSwapLoaders(this.connector)).andReturn(this.delegatingLoader).times(2);
        this.connector.initialize((ConnectorContext) EasyMock.anyObject(ConnectorContext.class));
        EasyMock.expectLastCall();
        this.connector.start(hashMap);
        EasyMock.expectLastCall();
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.connectorStatusListener.onStartup(CONNECTOR_ID);
        EasyMock.expectLastCall();
        this.connector.stop();
        EasyMock.expectLastCall();
        this.connectorStatusListener.onShutdown(CONNECTOR_ID);
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        try {
            this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED);
            Assert.fail("Should have thrown exception when trying to add connector with same name.");
        } catch (ConnectException e) {
        }
        assertStatistics(this.worker, 1, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        this.worker.stopConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        PowerMock.verifyAll();
        MockFileConfigProvider.assertClosed(this.mockFileProviderTestId);
    }

    private void expectFileConfigProvider() {
        EasyMock.expect(this.plugins.newConfigProvider((AbstractConfig) EasyMock.anyObject(), (String) EasyMock.eq("config.providers.file"), (Plugins.ClassLoaderUsage) EasyMock.anyObject())).andAnswer(() -> {
            MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
            mockFileConfigProvider.configure(Collections.singletonMap("testId", this.mockFileProviderTestId));
            return mockFileConfigProvider;
        });
    }

    @Test
    public void testStartConnectorFailure() {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", "java.util.HashMap");
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.newConnector(EasyMock.anyString())).andThrow(new ConnectException("Failed to find Connector"));
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.connectorStatusListener.onFailure((String) EasyMock.eq(CONNECTOR_ID), (Throwable) EasyMock.anyObject());
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertFalse(this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED));
        assertStartupStatistics(this.worker, 1, 1, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 1, 0, 0);
        Assert.assertFalse(this.worker.stopConnector(CONNECTOR_ID));
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 1, 0, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testAddConnectorByAlias() {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(this.plugins.newConnector("WorkerTestConnector")).andReturn(this.connector);
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", "WorkerTestConnector");
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        EasyMock.expect(this.plugins.compareAndSwapLoaders(this.connector)).andReturn(this.delegatingLoader).times(2);
        this.connector.initialize((ConnectorContext) EasyMock.anyObject(ConnectorContext.class));
        EasyMock.expectLastCall();
        this.connector.start(hashMap);
        EasyMock.expectLastCall();
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.connectorStatusListener.onStartup(CONNECTOR_ID);
        EasyMock.expectLastCall();
        this.connector.stop();
        EasyMock.expectLastCall();
        this.connectorStatusListener.onShutdown(CONNECTOR_ID);
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        assertStatistics(this.worker, 1, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        this.worker.stopConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testAddConnectorByShortAlias() {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(this.plugins.newConnector("WorkerTest")).andReturn(this.connector);
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", "WorkerTest");
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        EasyMock.expect(this.plugins.compareAndSwapLoaders(this.connector)).andReturn(this.delegatingLoader).times(2);
        this.connector.initialize((ConnectorContext) EasyMock.anyObject(ConnectorContext.class));
        EasyMock.expectLastCall();
        this.connector.start(hashMap);
        EasyMock.expectLastCall();
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.connectorStatusListener.onStartup(CONNECTOR_ID);
        EasyMock.expectLastCall();
        this.connector.stop();
        EasyMock.expectLastCall();
        this.connectorStatusListener.onShutdown(CONNECTOR_ID);
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        assertStatistics(this.worker, 1, 0);
        this.worker.stopConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testStopInvalidConnector() {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        this.worker.stopConnector(CONNECTOR_ID);
        PowerMock.verifyAll();
    }

    @Test
    public void testReconfigureConnectorTasks() {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(3);
        EasyMock.expect(this.plugins.newConnector(WorkerTestConnector.class.getName())).andReturn(this.connector);
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", WorkerTestConnector.class.getName());
        EasyMock.expect(this.connector.version()).andReturn("1.0");
        EasyMock.expect(this.plugins.compareAndSwapLoaders(this.connector)).andReturn(this.delegatingLoader).times(3);
        this.connector.initialize((ConnectorContext) EasyMock.anyObject(ConnectorContext.class));
        EasyMock.expectLastCall();
        this.connector.start(hashMap);
        EasyMock.expectLastCall();
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(3);
        this.connectorStatusListener.onStartup(CONNECTOR_ID);
        EasyMock.expectLastCall();
        EasyMock.expect(this.connector.taskClass()).andReturn(TestSourceTask.class);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("foo", "bar");
        EasyMock.expect(this.connector.taskConfigs(2)).andReturn(Arrays.asList(hashMap2, hashMap2));
        this.connector.stop();
        EasyMock.expectLastCall();
        this.connectorStatusListener.onShutdown(CONNECTOR_ID);
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 1, 0);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        try {
            this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED);
            Assert.fail("Should have thrown exception when trying to add connector with same name.");
        } catch (ConnectException e) {
        }
        HashMap hashMap3 = new HashMap(hashMap);
        hashMap3.put("tasks.max", "2");
        List connectorTaskConfigs = this.worker.connectorTaskConfigs(CONNECTOR_ID, new SinkConnectorConfig(this.plugins, hashMap3));
        HashMap hashMap4 = new HashMap();
        hashMap4.put("foo", "bar");
        hashMap4.put("task.class", TestSourceTask.class.getName());
        hashMap4.put("topics", "foo,bar");
        Assert.assertEquals(2L, connectorTaskConfigs.size());
        Assert.assertEquals(hashMap4, connectorTaskConfigs.get(0));
        Assert.assertEquals(hashMap4, connectorTaskConfigs.get(1));
        assertStatistics(this.worker, 1, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        this.worker.stopConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testAddRemoveTask() throws Exception {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.workerTask.id()).andStubReturn(TASK_ID);
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        PowerMock.expectNew(WorkerSourceTask.class, new Object[]{EasyMock.eq(TASK_ID), EasyMock.eq(this.task), EasyMock.anyObject(TaskStatus.Listener.class), EasyMock.eq(TargetState.STARTED), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR)), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.eq(this.config), EasyMock.anyObject(ClusterConfigState.class), EasyMock.anyObject(ConnectMetrics.class), EasyMock.anyObject(ClassLoader.class), EasyMock.anyObject(Time.class), EasyMock.anyObject(RetryWithToleranceOperator.class), EasyMock.anyObject(StatusBackingStore.class), EasyMock.anyObject(Executor.class)}).andReturn(this.workerTask);
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(hashMap);
        EasyMock.expect(this.plugins.newTask(TestSourceTask.class)).andReturn(this.task);
        EasyMock.expect(this.task.version()).andReturn("1.0");
        this.workerTask.initialize(taskConfig);
        EasyMock.expectLastCall();
        Assert.assertNotNull(this.taskKeyConverter);
        Assert.assertNotNull(this.taskValueConverter);
        Assert.assertNotNull(this.taskHeaderConverter);
        expectTaskKeyConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskKeyConverter);
        expectTaskValueConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskValueConverter);
        expectTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        EasyMock.expect(this.executorService.submit((Runnable) this.workerTask)).andReturn((Object) null);
        EasyMock.expect(this.plugins.delegatingLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(this.workerTask.loader()).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.plugins.connectorClass(WorkerTestConnector.class.getName());
        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
        this.workerTask.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.workerTask.awaitStop(EasyMock.anyLong()))).andStubReturn(true);
        EasyMock.expectLastCall();
        this.workerTask.removeMetrics();
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), hashMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        assertStartupStatistics(this.worker, 0, 0, 1, 0);
        Assert.assertEquals(new HashSet(Arrays.asList(TASK_ID)), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 1, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 1, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testTaskStatusMetricsStatuses() throws Exception {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.workerTask.id()).andStubReturn(TASK_ID);
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        PowerMock.expectNew(WorkerSourceTask.class, new Object[]{EasyMock.eq(TASK_ID), EasyMock.eq(this.task), EasyMock.anyObject(TaskStatus.Listener.class), EasyMock.eq(TargetState.STARTED), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR)), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.eq(this.config), EasyMock.anyObject(ClusterConfigState.class), EasyMock.anyObject(ConnectMetrics.class), EasyMock.anyObject(ClassLoader.class), EasyMock.anyObject(Time.class), EasyMock.anyObject(RetryWithToleranceOperator.class), EasyMock.anyObject(StatusBackingStore.class), EasyMock.anyObject(Executor.class)}).andReturn(this.workerTask);
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(hashMap);
        EasyMock.expect(this.plugins.newTask(TestSourceTask.class)).andReturn(this.task);
        EasyMock.expect(this.task.version()).andReturn("1.0");
        this.workerTask.initialize(taskConfig);
        EasyMock.expectLastCall();
        Assert.assertNotNull(this.taskKeyConverter);
        Assert.assertNotNull(this.taskValueConverter);
        Assert.assertNotNull(this.taskHeaderConverter);
        expectTaskKeyConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskKeyConverter);
        expectTaskValueConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskValueConverter);
        expectTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        EasyMock.expect(this.executorService.submit((Runnable) this.workerTask)).andReturn((Object) null);
        EasyMock.expect(this.plugins.delegatingLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.plugins.connectorClass(WorkerTestConnector.class.getName());
        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
        EasyMock.expect(Boolean.valueOf(this.workerTask.awaitStop(EasyMock.anyLong()))).andStubReturn(true);
        EasyMock.expectLastCall();
        this.workerTask.removeMetrics();
        EasyMock.expectLastCall();
        this.herder.taskStatus(TASK_ID);
        EasyMock.expectLastCall().andReturn(new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg"));
        this.herder.taskStatus(TASK_ID);
        EasyMock.expectLastCall().andReturn(new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg"));
        this.herder.taskStatus(TASK_ID);
        EasyMock.expectLastCall().andReturn(new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg"));
        this.herder.taskStatus(TASK_ID);
        EasyMock.expectLastCall().andReturn(new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", "msg"));
        this.herder.taskStatus(TASK_ID);
        EasyMock.expectLastCall().andReturn(new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", "msg"));
        EasyMock.expect(this.workerTask.loader()).andReturn(this.pluginLoader);
        this.workerTask.stop();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), hashMap, this.taskStatusListener, TargetState.STARTED);
        assertStatusMetrics(1L, "connector-running-task-count");
        assertStatusMetrics(1L, "connector-paused-task-count");
        assertStatusMetrics(1L, "connector-failed-task-count");
        assertStatusMetrics(1L, "connector-destroyed-task-count");
        assertStatusMetrics(1L, "connector-unassigned-task-count");
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatusMetrics(0L, "connector-running-task-count");
        assertStatusMetrics(0L, "connector-paused-task-count");
        assertStatusMetrics(0L, "connector-failed-task-count");
        assertStatusMetrics(0L, "connector-destroyed-task-count");
        assertStatusMetrics(0L, "connector-unassigned-task-count");
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorStatusMetricsGroup_taskStatusCounter() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(new ConnectorTaskId("c1", 0), this.workerTask);
        concurrentHashMap.put(new ConnectorTaskId("c1", 1), this.workerTask);
        concurrentHashMap.put(new ConnectorTaskId("c2", 0), this.workerTask);
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.taskStatusListener.onFailure((ConnectorTaskId) EasyMock.eq(TASK_ID), (Throwable) EasyMock.anyObject());
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        Worker.ConnectorStatusMetricsGroup connectorStatusMetricsGroup = new Worker.ConnectorStatusMetricsGroup(this.worker.metrics(), concurrentHashMap, this.herder);
        Assert.assertEquals(2L, ((Long) connectorStatusMetricsGroup.taskCounter("c1").metricValue(0L)).longValue());
        Assert.assertEquals(1L, ((Long) connectorStatusMetricsGroup.taskCounter("c2").metricValue(0L)).longValue());
        Assert.assertEquals(0L, ((Long) connectorStatusMetricsGroup.taskCounter("fakeConnector").metricValue(0L)).longValue());
    }

    @Test
    public void testStartTaskFailure() {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", "missing.From.This.Workers.Classpath");
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.plugins.delegatingLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader);
        this.taskStatusListener.onFailure((ConnectorTaskId) EasyMock.eq(TASK_ID), (Throwable) EasyMock.anyObject());
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 0, 0);
        Assert.assertFalse(this.worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), hashMap, this.taskStatusListener, TargetState.STARTED));
        assertStartupStatistics(this.worker, 0, 0, 1, 1);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 1, 1);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        PowerMock.verifyAll();
    }

    @Test
    public void testCleanupTasksOnStop() throws Exception {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.workerTask.id()).andStubReturn(TASK_ID);
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        PowerMock.expectNew(WorkerSourceTask.class, new Object[]{EasyMock.eq(TASK_ID), EasyMock.eq(this.task), EasyMock.anyObject(TaskStatus.Listener.class), EasyMock.eq(TargetState.STARTED), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR)), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(ClusterConfigState.class), EasyMock.anyObject(ConnectMetrics.class), EasyMock.eq(this.pluginLoader), EasyMock.anyObject(Time.class), EasyMock.anyObject(RetryWithToleranceOperator.class), EasyMock.anyObject(StatusBackingStore.class), EasyMock.anyObject(Executor.class)}).andReturn(this.workerTask);
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(hashMap);
        EasyMock.expect(this.plugins.newTask(TestSourceTask.class)).andReturn(this.task);
        EasyMock.expect(this.task.version()).andReturn("1.0");
        this.workerTask.initialize(taskConfig);
        EasyMock.expectLastCall();
        Assert.assertNotNull(this.taskKeyConverter);
        Assert.assertNotNull(this.taskValueConverter);
        Assert.assertNotNull(this.taskHeaderConverter);
        expectTaskKeyConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        expectTaskKeyConverters(Plugins.ClassLoaderUsage.PLUGINS, this.taskKeyConverter);
        expectTaskValueConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        expectTaskValueConverters(Plugins.ClassLoaderUsage.PLUGINS, this.taskValueConverter);
        expectTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        expectTaskHeaderConverter(Plugins.ClassLoaderUsage.PLUGINS, this.taskHeaderConverter);
        EasyMock.expect(this.executorService.submit((Runnable) this.workerTask)).andReturn((Object) null);
        EasyMock.expect(this.plugins.delegatingLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(this.workerTask.loader()).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.plugins.connectorClass(WorkerTestConnector.class.getName());
        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
        this.workerTask.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.workerTask.awaitStop(EasyMock.anyLong()))).andReturn(true);
        EasyMock.expectLastCall();
        this.workerTask.removeMetrics();
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        this.worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), hashMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testConverterOverrides() throws Exception {
        expectConverters();
        expectStartStorage();
        expectFileConfigProvider();
        EasyMock.expect(this.workerTask.id()).andStubReturn(TASK_ID);
        Capture newCapture = EasyMock.newCapture();
        Capture newCapture2 = EasyMock.newCapture();
        Capture newCapture3 = EasyMock.newCapture();
        EasyMock.expect(this.plugins.currentThreadLoader()).andReturn(this.delegatingLoader).times(2);
        PowerMock.expectNew(WorkerSourceTask.class, new Object[]{EasyMock.eq(TASK_ID), EasyMock.eq(this.task), EasyMock.anyObject(TaskStatus.Listener.class), EasyMock.eq(TargetState.STARTED), EasyMock.capture(newCapture), EasyMock.capture(newCapture2), EasyMock.capture(newCapture3), EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR)), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(ClusterConfigState.class), EasyMock.anyObject(ConnectMetrics.class), EasyMock.eq(this.pluginLoader), EasyMock.anyObject(Time.class), EasyMock.anyObject(RetryWithToleranceOperator.class), EasyMock.anyObject(StatusBackingStore.class), EasyMock.anyObject(Executor.class)}).andReturn(this.workerTask);
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(hashMap);
        EasyMock.expect(this.plugins.newTask(TestSourceTask.class)).andReturn(this.task);
        EasyMock.expect(this.task.version()).andReturn("1.0");
        this.workerTask.initialize(taskConfig);
        EasyMock.expectLastCall();
        Assert.assertNotNull(this.taskKeyConverter);
        Assert.assertNotNull(this.taskValueConverter);
        Assert.assertNotNull(this.taskHeaderConverter);
        expectTaskKeyConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        expectTaskKeyConverters(Plugins.ClassLoaderUsage.PLUGINS, this.taskKeyConverter);
        expectTaskValueConverters(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        expectTaskValueConverters(Plugins.ClassLoaderUsage.PLUGINS, this.taskValueConverter);
        expectTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        expectTaskHeaderConverter(Plugins.ClassLoaderUsage.PLUGINS, this.taskHeaderConverter);
        EasyMock.expect(this.executorService.submit((Runnable) this.workerTask)).andReturn((Object) null);
        EasyMock.expect(this.plugins.delegatingLoader()).andReturn(this.delegatingLoader);
        EasyMock.expect(this.delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.pluginLoader)).andReturn(this.delegatingLoader).times(2);
        EasyMock.expect(this.workerTask.loader()).andReturn(this.pluginLoader);
        EasyMock.expect(Plugins.compareAndSwapLoaders(this.delegatingLoader)).andReturn(this.pluginLoader).times(2);
        this.plugins.connectorClass(WorkerTestConnector.class.getName());
        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
        this.workerTask.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.workerTask.awaitStop(EasyMock.anyLong()))).andStubReturn(true);
        EasyMock.expectLastCall();
        this.workerTask.removeMetrics();
        EasyMock.expectLastCall();
        expectStopStorage();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("key.converter", TestConverter.class.getName());
        anyConnectorConfigMap.put("key.converter.extra.config", "foo");
        anyConnectorConfigMap.put("value.converter", TestConfigurableConverter.class.getName());
        anyConnectorConfigMap.put("value.converter.extra.config", "bar");
        this.worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap, hashMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        Assert.assertEquals(new HashSet(Arrays.asList(TASK_ID)), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        PowerMock.verifyAll();
    }

    @Test
    public void testProducerConfigsWithoutOverrides() {
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("producer.override.")).andReturn(new HashMap());
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap(this.defaultProducerConfigs);
        hashMap.put("client.id", "connector-producer-job-0");
        Assert.assertEquals(hashMap, Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, this.config, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy));
    }

    @Test
    public void testProducerConfigsWithOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("producer.acks", "-1");
        hashMap.put("producer.linger.ms", "1000");
        hashMap.put("producer.client.id", "producer-test-id");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultProducerConfigs);
        hashMap2.put("acks", "-1");
        hashMap2.put("linger.ms", "1000");
        hashMap2.put("client.id", "producer-test-id");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("producer.override.")).andReturn(new HashMap());
        PowerMock.replayAll(new Object[0]);
        Assert.assertEquals(hashMap2, Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy));
    }

    @Test
    public void testProducerConfigsWithClientOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("producer.acks", "-1");
        hashMap.put("producer.linger.ms", "1000");
        hashMap.put("producer.client.id", "producer-test-id");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultProducerConfigs);
        hashMap2.put("acks", "-1");
        hashMap2.put("linger.ms", "5000");
        hashMap2.put("batch.size", "1000");
        hashMap2.put("client.id", "producer-test-id");
        HashMap hashMap3 = new HashMap();
        hashMap3.put("linger.ms", "5000");
        hashMap3.put("batch.size", "1000");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("producer.override.")).andReturn(hashMap3);
        PowerMock.replayAll(new Object[0]);
        Assert.assertEquals(hashMap2, Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy));
    }

    @Test
    public void testConsumerConfigsWithoutOverrides() {
        HashMap hashMap = new HashMap(this.defaultConsumerConfigs);
        hashMap.put("group.id", "connect-test");
        hashMap.put("client.id", "connector-consumer-test-1");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("consumer.override.")).andReturn(new HashMap());
        PowerMock.replayAll(new Object[0]);
        Assert.assertEquals(hashMap, Worker.consumerConfigs(new ConnectorTaskId("test", 1), this.config, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy));
    }

    @Test
    public void testConsumerConfigsWithOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("consumer.auto.offset.reset", "latest");
        hashMap.put("consumer.max.poll.records", "1000");
        hashMap.put("consumer.client.id", "consumer-test-id");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultConsumerConfigs);
        hashMap2.put("group.id", "connect-test");
        hashMap2.put("auto.offset.reset", "latest");
        hashMap2.put("max.poll.records", "1000");
        hashMap2.put("client.id", "consumer-test-id");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("consumer.override.")).andReturn(new HashMap());
        PowerMock.replayAll(new Object[0]);
        Assert.assertEquals(hashMap2, Worker.consumerConfigs(new ConnectorTaskId("test", 1), standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy));
    }

    @Test
    public void testConsumerConfigsWithClientOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("consumer.auto.offset.reset", "latest");
        hashMap.put("consumer.max.poll.records", "5000");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultConsumerConfigs);
        hashMap2.put("group.id", "connect-test");
        hashMap2.put("auto.offset.reset", "latest");
        hashMap2.put("max.poll.records", "5000");
        hashMap2.put("max.poll.interval.ms", "1000");
        hashMap2.put("client.id", "connector-consumer-test-1");
        HashMap hashMap3 = new HashMap();
        hashMap3.put("max.poll.records", "5000");
        hashMap3.put("max.poll.interval.ms", "1000");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("consumer.override.")).andReturn(hashMap3);
        PowerMock.replayAll(new Object[0]);
        Assert.assertEquals(hashMap2, Worker.consumerConfigs(new ConnectorTaskId("test", 1), standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy));
    }

    @Test(expected = ConnectException.class)
    public void testConsumerConfigsClientOverridesWithNonePolicy() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("consumer.auto.offset.reset", "latest");
        hashMap.put("consumer.max.poll.records", "5000");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("max.poll.records", "5000");
        hashMap2.put("max.poll.interval.ms", "1000");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("consumer.override.")).andReturn(hashMap2);
        PowerMock.replayAll(new Object[0]);
        Worker.consumerConfigs(new ConnectorTaskId("test", 1), standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy);
    }

    @Test
    public void testAdminConfigsClientOverridesWithAllPolicy() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("admin.client.id", "testid");
        hashMap.put("admin.metadata.max.age.ms", "5000");
        hashMap.put("producer.bootstrap.servers", "cbeauho.com");
        hashMap.put("consumer.bootstrap.servers", "localhost:4761");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("metadata.max.age.ms", "10000");
        HashMap hashMap3 = new HashMap(this.workerProps);
        hashMap3.put("bootstrap.servers", "localhost:9092");
        hashMap3.put("client.id", "testid");
        hashMap3.put("metadata.max.age.ms", "10000");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("admin.override.")).andReturn(hashMap2);
        PowerMock.replayAll(new Object[0]);
        Assert.assertEquals(hashMap3, Worker.adminConfigs(new ConnectorTaskId("test", 1), standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy));
    }

    @Test(expected = ConnectException.class)
    public void testAdminConfigsClientOverridesWithNonePolicy() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("admin.client.id", "testid");
        hashMap.put("admin.metadata.max.age.ms", "5000");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("metadata.max.age.ms", "10000");
        EasyMock.expect(this.connectorConfig.originalsWithPrefix("admin.override.")).andReturn(hashMap2);
        PowerMock.replayAll(new Object[0]);
        Worker.adminConfigs(new ConnectorTaskId("test", 1), standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy);
    }

    private void assertStatusMetrics(long j, String str) {
        ConnectMetrics.MetricGroup metricGroup = this.worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
        if (j == 0) {
            Assert.assertNull(metricGroup);
        } else {
            Assert.assertEquals(j, ((Long) MockConnectMetrics.currentMetricValue(this.worker.metrics(), metricGroup, str)).longValue());
        }
    }

    private void assertStatistics(Worker worker, int i, int i2) {
        ConnectMetrics.MetricGroup metricGroup = worker.workerMetricsGroup().metricGroup();
        Assert.assertEquals(i, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-count"), 1.0E-4d);
        Assert.assertEquals(i2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-count"), 1.0E-4d);
        Assert.assertEquals(i2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-count"), 1.0E-4d);
    }

    private void assertStartupStatistics(Worker worker, int i, int i2, int i3, int i4) {
        double d = i - i2;
        double d2 = i3 - i4;
        double d3 = 0.0d;
        double d4 = 0.0d;
        double d5 = 0.0d;
        double d6 = 0.0d;
        if (i != 0) {
            d3 = d / i;
            d4 = i2 / i;
        }
        if (i3 != 0) {
            d5 = d2 / i3;
            d6 = i4 / i3;
        }
        ConnectMetrics.MetricGroup metricGroup = worker.workerMetricsGroup().metricGroup();
        Assert.assertEquals(i, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-attempts-total"), 1.0E-4d);
        Assert.assertEquals(d, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-success-total"), 1.0E-4d);
        Assert.assertEquals(i2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-failure-total"), 1.0E-4d);
        Assert.assertEquals(d3, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-success-percentage"), 1.0E-4d);
        Assert.assertEquals(d4, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-failure-percentage"), 1.0E-4d);
        Assert.assertEquals(i3, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-attempts-total"), 1.0E-4d);
        Assert.assertEquals(d2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-success-total"), 1.0E-4d);
        Assert.assertEquals(i4, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-failure-total"), 1.0E-4d);
        Assert.assertEquals(d5, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-success-percentage"), 1.0E-4d);
        Assert.assertEquals(d6, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-failure-percentage"), 1.0E-4d);
    }

    private void expectStartStorage() {
        this.offsetBackingStore.configure((WorkerConfig) EasyMock.anyObject(WorkerConfig.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        EasyMock.expect(this.herder.statusBackingStore()).andReturn(this.statusBackingStore).anyTimes();
    }

    private void expectStopStorage() {
        this.offsetBackingStore.stop();
        EasyMock.expectLastCall();
    }

    private void expectConverters() {
        expectConverters(JsonConverter.class, false);
    }

    private void expectConverters(Boolean bool) {
        expectConverters(JsonConverter.class, bool);
    }

    private void expectConverters(Class<? extends Converter> cls, Boolean bool) {
        if (bool.booleanValue()) {
            EasyMock.expect(EasyMock.eq(this.plugins.newConverter(this.config, "key.converter", Plugins.ClassLoaderUsage.PLUGINS))).andReturn(this.keyConverter);
            EasyMock.expect(EasyMock.eq(this.plugins.newConverter(this.config, "value.converter", Plugins.ClassLoaderUsage.PLUGINS))).andReturn(this.valueConverter);
            EasyMock.expectLastCall();
        }
        Converter converter = (Converter) PowerMock.createMock(cls);
        Converter converter2 = (Converter) PowerMock.createMock(cls);
        EasyMock.expect(this.plugins.newConverter(this.config, "internal.key.converter", Plugins.ClassLoaderUsage.PLUGINS)).andReturn(converter);
        EasyMock.expect(this.plugins.newConverter(this.config, "internal.value.converter", Plugins.ClassLoaderUsage.PLUGINS)).andReturn(converter2);
        EasyMock.expectLastCall();
    }

    private void expectTaskKeyConverters(Plugins.ClassLoaderUsage classLoaderUsage, Converter converter) {
        EasyMock.expect(this.plugins.newConverter((AbstractConfig) EasyMock.anyObject(AbstractConfig.class), (String) EasyMock.eq("key.converter"), (Plugins.ClassLoaderUsage) EasyMock.eq(classLoaderUsage))).andReturn(converter);
    }

    private void expectTaskValueConverters(Plugins.ClassLoaderUsage classLoaderUsage, Converter converter) {
        EasyMock.expect(this.plugins.newConverter((AbstractConfig) EasyMock.anyObject(AbstractConfig.class), (String) EasyMock.eq("value.converter"), (Plugins.ClassLoaderUsage) EasyMock.eq(classLoaderUsage))).andReturn(converter);
    }

    private void expectTaskHeaderConverter(Plugins.ClassLoaderUsage classLoaderUsage, HeaderConverter headerConverter) {
        EasyMock.expect(this.plugins.newHeaderConverter((AbstractConfig) EasyMock.anyObject(AbstractConfig.class), (String) EasyMock.eq("header.converter"), (Plugins.ClassLoaderUsage) EasyMock.eq(classLoaderUsage))).andReturn(headerConverter);
    }

    private Map<String, String> anyConnectorConfigMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", WorkerTestConnector.class.getName());
        hashMap.put("tasks.max", "1");
        return hashMap;
    }
}
