package org.apache.kafka.connect.runtime;

import io.confluent.logevents.connect.LogEventState;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.FenceProducersResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerMetricsGroup;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.TopicAdmin;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.MockitoSession;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest.class */
public class WorkerTest {
    private static final String CONNECTOR_ID = "test-connector";
    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
    private static final String WORKER_ID = "localhost:8083";
    private static final String CLUSTER_ID = "test-cluster";
    private WorkerConfig config;
    private Worker worker;

    @Mock
    private Plugins plugins;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private LoaderSwap loaderSwap;

    @Mock
    private Runnable isolatedRunnable;

    @Mock
    private OffsetBackingStore offsetBackingStore;

    @Mock
    private TaskStatus.Listener taskStatusListener;

    @Mock
    private ConnectorStatus.Listener connectorStatusListener;

    @Mock
    private Herder herder;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private SourceConnector sourceConnector;

    @Mock
    private SinkConnector sinkConnector;

    @Mock
    private CloseableConnectorContext ctx;

    @Mock
    private TestSourceTask task;

    @Mock
    private Converter taskKeyConverter;

    @Mock
    private Converter taskValueConverter;

    @Mock
    private HeaderConverter taskHeaderConverter;

    @Mock
    private ExecutorService executorService;

    @Mock
    private ConnectorConfig connectorConfig;
    private String mockFileProviderTestId;
    private Map<String, String> connectorProps;
    private final boolean enableTopicCreation;
    private MockedConstruction<WorkerSourceTask> sourceTaskMockedConstruction;
    private MockitoSession mockitoSession;
    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
    private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
    private final Map<String, String> workerProps = new HashMap();
    private final Map<String, String> defaultProducerConfigs = new HashMap();
    private final Map<String, String> defaultConsumerConfigs = new HashMap();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestSinkTask.class */
    private static class TestSinkTask extends SinkTask {
        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
        }

        public void put(Collection<SinkRecord> collection) {
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestSourceTask.class */
    private static class TestSourceTask extends SourceTask {
        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
        }

        public List<SourceRecord> poll() {
            return null;
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$WorkerTestSinkConnector.class */
    public static class WorkerTestSinkConnector extends SinkConnector {
        public void start(Map<String, String> map) {
        }

        public Class<? extends Task> taskClass() {
            return TestSinkTask.class;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return null;
        }

        public void stop() {
        }

        public ConfigDef config() {
            return null;
        }

        public String version() {
            return "1.0";
        }
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    public WorkerTest(boolean z) {
        this.enableTopicCreation = z;
    }

    @Before
    public void setup() {
        this.mockitoSession = Mockito.mockitoSession().initMocks(this).strictness(Strictness.STRICT_STUBS).startMocking();
        this.workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
        this.workerProps.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        this.workerProps.put("config.providers", "file");
        this.workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
        this.mockFileProviderTestId = UUID.randomUUID().toString();
        this.workerProps.put("config.providers.file.param.testId", this.mockFileProviderTestId);
        this.workerProps.put("topic.creation.enable", String.valueOf(this.enableTopicCreation));
        this.config = new StandaloneConfig(this.workerProps);
        this.defaultProducerConfigs.put("bootstrap.servers", "localhost:9092");
        this.defaultProducerConfigs.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.defaultProducerConfigs.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.defaultProducerConfigs.put("max.block.ms", Long.toString(Long.MAX_VALUE));
        this.defaultProducerConfigs.put("enable.idempotence", "false");
        this.defaultProducerConfigs.put("acks", "all");
        this.defaultProducerConfigs.put("max.in.flight.requests.per.connection", "1");
        this.defaultProducerConfigs.put("delivery.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        this.defaultConsumerConfigs.put("bootstrap.servers", "localhost:9092");
        this.defaultConsumerConfigs.put("enable.auto.commit", "false");
        this.defaultConsumerConfigs.put("auto.offset.reset", "earliest");
        this.defaultConsumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.defaultConsumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.connectorProps = anyConnectorConfigMap();
        this.sourceTaskMockedConstruction = Mockito.mockConstructionWithAnswer(WorkerSourceTask.class, invocationOnMock -> {
            String name = invocationOnMock.getMethod().getName();
            boolean z = -1;
            switch (name.hashCode()) {
                case -1097519085:
                    if (name.equals("loader")) {
                        z = true;
                        break;
                    }
                    break;
                case 3355:
                    if (name.equals("id")) {
                        z = false;
                        break;
                    }
                    break;
                case 1015896696:
                    if (name.equals("awaitStop")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return TASK_ID;
                case true:
                    return this.pluginLoader;
                case true:
                    return true;
                default:
                    return null;
            }
        }, new Answer[0]);
    }

    @After
    public void teardown() {
        this.sourceTaskMockedConstruction.close();
        this.mockitoSession.finishMocking();
    }

    @Test
    public void testStartAndStopConnector() throws Throwable {
        String name = SampleSourceConnector.class.getName();
        this.connectorProps.put("connector.class", name);
        mockKafkaClusterId();
        mockConnectorIsolation(name, this.sourceConnector);
        mockExecutorRealSubmit(WorkerConnector.class);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        Assert.assertEquals(TargetState.STARTED, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(Collections.singleton(CONNECTOR_ID), this.worker.connectorNames());
        FutureCallback futureCallback2 = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback2);
        try {
            futureCallback2.get(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should have failed while trying to start second connector with same name");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ConnectException.class));
        }
        assertStatistics(this.worker, 1, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        this.worker.stopAndAwaitConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyKafkaClusterId();
        verifyConnectorIsolation(this.sourceConnector);
        verifyExecutorSubmit();
        ((SourceConnector) Mockito.verify(this.sourceConnector)).initialize((ConnectorContext) ArgumentMatchers.any(ConnectorContext.class));
        ((SourceConnector) Mockito.verify(this.sourceConnector)).start(this.connectorProps);
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onStartup(CONNECTOR_ID);
        ((SourceConnector) Mockito.verify(this.sourceConnector)).stop();
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onShutdown(CONNECTOR_ID);
        ((CloseableConnectorContext) Mockito.verify(this.ctx)).close();
        MockFileConfigProvider.assertClosed(this.mockFileProviderTestId);
    }

    private void mockFileConfigProvider() {
        MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
        mockFileConfigProvider.configure(Collections.singletonMap("testId", this.mockFileProviderTestId));
        Mockito.when(this.plugins.newConfigProvider((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq("config.providers.file"), (Plugins.ClassLoaderUsage) ArgumentMatchers.any(Plugins.ClassLoaderUsage.class))).thenReturn(mockFileConfigProvider);
    }

    @Test
    public void testStartConnectorFailure() throws Exception {
        this.connectorProps.put("connector.class", "java.util.HashMap");
        Throwable connectException = new ConnectException("Failed to find Connector");
        mockKafkaClusterId();
        mockGenericIsolation();
        Mockito.when(this.plugins.newConnector(ArgumentMatchers.anyString())).thenThrow(new Throwable[]{connectException});
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        try {
            futureCallback.get(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should have failed to start connector");
        } catch (ExecutionException e) {
            Assert.assertEquals(connectException, e.getCause());
        }
        assertStartupStatistics(this.worker, 1, 1, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 1, 0, 0);
        this.worker.stopAndAwaitConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 1, 0, 0);
        ((Plugins) Mockito.verify(this.plugins)).newConnector(ArgumentMatchers.anyString());
        verifyKafkaClusterId();
        verifyGenericIsolation();
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onFailure((String) ArgumentMatchers.eq(CONNECTOR_ID), (Throwable) ArgumentMatchers.any(ConnectException.class));
    }

    @Test
    public void testAddConnectorByAlias() throws Throwable {
        mockKafkaClusterId();
        mockConnectorIsolation("SampleSourceConnector", this.sinkConnector);
        mockExecutorRealSubmit(WorkerConnector.class);
        this.connectorProps.put("connector.class", "SampleSourceConnector");
        this.connectorProps.put("topics", "gfieyls, wfru");
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        Assert.assertEquals(TargetState.STARTED, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(Collections.singleton(CONNECTOR_ID), this.worker.connectorNames());
        assertStatistics(this.worker, 1, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        this.worker.stopAndAwaitConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        verifyKafkaClusterId();
        verifyConnectorIsolation(this.sinkConnector);
        verifyExecutorSubmit();
        ((SinkConnector) Mockito.verify(this.sinkConnector)).initialize((ConnectorContext) ArgumentMatchers.any(ConnectorContext.class));
        ((SinkConnector) Mockito.verify(this.sinkConnector)).start(this.connectorProps);
        ((SinkConnector) Mockito.verify(this.sinkConnector)).stop();
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onStartup(CONNECTOR_ID);
        ((CloseableConnectorContext) Mockito.verify(this.ctx)).close();
    }

    @Test
    public void testAddConnectorByShortAlias() throws Throwable {
        mockKafkaClusterId();
        mockConnectorIsolation("WorkerTest", this.sinkConnector);
        mockExecutorRealSubmit(WorkerConnector.class);
        this.connectorProps.put("connector.class", "WorkerTest");
        this.connectorProps.put("topics", "gfieyls, wfru");
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        Assert.assertEquals(TargetState.STARTED, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(Collections.singleton(CONNECTOR_ID), this.worker.connectorNames());
        assertStatistics(this.worker, 1, 0);
        this.worker.stopAndAwaitConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyKafkaClusterId();
        verifyConnectorIsolation(this.sinkConnector);
        ((SinkConnector) Mockito.verify(this.sinkConnector)).initialize((ConnectorContext) ArgumentMatchers.any(ConnectorContext.class));
        ((SinkConnector) Mockito.verify(this.sinkConnector)).start(this.connectorProps);
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onStartup(CONNECTOR_ID);
        ((SinkConnector) Mockito.verify(this.sinkConnector)).stop();
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onShutdown(CONNECTOR_ID);
        ((CloseableConnectorContext) Mockito.verify(this.ctx)).close();
        verifyExecutorSubmit();
    }

    @Test
    public void testStopInvalidConnector() {
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        this.worker.stopAndAwaitConnector(CONNECTOR_ID);
        verifyKafkaClusterId();
        verifyConverters();
    }

    @Test
    public void testReconfigureConnectorTasks() throws Throwable {
        String name = SampleSourceConnector.class.getName();
        mockKafkaClusterId();
        mockConnectorIsolation(name, this.sinkConnector);
        mockExecutorRealSubmit(WorkerConnector.class);
        Map singletonMap = Collections.singletonMap("foo", "bar");
        Mockito.when(this.sinkConnector.taskConfigs(2)).thenReturn(Arrays.asList(singletonMap, singletonMap));
        ((SinkConnector) Mockito.doReturn(TestSourceTask.class).when(this.sinkConnector)).taskClass();
        this.connectorProps.put("topics", "foo,bar");
        this.connectorProps.put("connector.class", name);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        Assert.assertEquals(TargetState.STARTED, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        assertStatistics(this.worker, 1, 0);
        Assert.assertEquals(Collections.singleton(CONNECTOR_ID), this.worker.connectorNames());
        FutureCallback futureCallback2 = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, this.connectorProps, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback2);
        try {
            futureCallback2.get(0L, TimeUnit.MILLISECONDS);
            Assert.fail("Should have failed while trying to start second connector with same name");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ConnectException.class));
        }
        HashMap hashMap = new HashMap(this.connectorProps);
        hashMap.put("tasks.max", "2");
        List connectorTaskConfigs = this.worker.connectorTaskConfigs(CONNECTOR_ID, new SinkConnectorConfig(this.plugins, hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("foo", "bar");
        hashMap2.put("task.class", TestSourceTask.class.getName());
        hashMap2.put("topics", "foo,bar");
        Assert.assertEquals(2L, connectorTaskConfigs.size());
        Assert.assertEquals(hashMap2, connectorTaskConfigs.get(0));
        Assert.assertEquals(hashMap2, connectorTaskConfigs.get(1));
        assertStatistics(this.worker, 1, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        this.worker.stopAndAwaitConnector(CONNECTOR_ID);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 1, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyKafkaClusterId();
        verifyConnectorIsolation(this.sinkConnector);
        verifyExecutorSubmit();
        ((SinkConnector) Mockito.verify(this.sinkConnector)).initialize((ConnectorContext) ArgumentMatchers.any(ConnectorContext.class));
        ((SinkConnector) Mockito.verify(this.sinkConnector)).start(this.connectorProps);
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onStartup(CONNECTOR_ID);
        ((SinkConnector) Mockito.verify(this.sinkConnector)).taskClass();
        ((SinkConnector) Mockito.verify(this.sinkConnector)).taskConfigs(2);
        ((SinkConnector) Mockito.verify(this.sinkConnector)).stop();
        ((ConnectorStatus.Listener) Mockito.verify(this.connectorStatusListener)).onShutdown(CONNECTOR_ID);
        ((CloseableConnectorContext) Mockito.verify(this.ctx)).close();
    }

    @Test
    public void testAddRemoveSourceTask() {
        mockKafkaClusterId();
        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, this.task);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        Map singletonMap = Collections.singletonMap("task.class", TestSourceTask.class.getName());
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), singletonMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        Assert.assertEquals(Collections.singleton(TASK_ID), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyKafkaClusterId();
        verifyTaskIsolation(this.task);
        verifyTaskConverter("key.converter");
        verifyTaskConverter("value.converter");
        verifyTaskHeaderConverter();
        verifyExecutorSubmit();
    }

    @Test
    public void testAddRemoveSinkTask() {
        SinkTask sinkTask = (SinkTask) Mockito.mock(TestSinkTask.class);
        mockKafkaClusterId();
        mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, sinkTask);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        Map singletonMap = Collections.singletonMap("task.class", TestSinkTask.class.getName());
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("topics", "t1");
        anyConnectorConfigMap.put("connector.class", SampleSinkConnector.class.getName());
        this.worker.startSinkTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap, singletonMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        Assert.assertEquals(Collections.singleton(TASK_ID), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyKafkaClusterId();
        verifyTaskIsolation(sinkTask);
        verifyTaskConverter("key.converter");
        verifyTaskConverter("value.converter");
        verifyTaskHeaderConverter();
        verifyExecutorSubmit();
    }

    @Test
    public void testAddRemoveExactlyOnceSourceTask() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        hashMap.put("config.providers", "file");
        hashMap.put("config.providers.file.class", MockFileConfigProvider.class.getName());
        this.mockFileProviderTestId = UUID.randomUUID().toString();
        hashMap.put("config.providers.file.param.testId", this.mockFileProviderTestId);
        hashMap.put("topic.creation.enable", String.valueOf(this.enableTopicCreation));
        hashMap.put("group.id", "connect-cluster");
        hashMap.put("bootstrap.servers", "localhost:2606");
        hashMap.put("offset.storage.topic", "connect-offsets");
        hashMap.put("config.storage.topic", "connect-configs");
        hashMap.put("status.storage.topic", "connect-statuses");
        hashMap.put("exactly.once.source.support", "enabled");
        this.config = new DistributedConfig(hashMap);
        mockKafkaClusterId();
        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, this.task);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        Runnable runnable = (Runnable) Mockito.mock(Runnable.class);
        Runnable runnable2 = (Runnable) Mockito.mock(Runnable.class);
        Map singletonMap = Collections.singletonMap("task.class", TestSourceTask.class.getName());
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), singletonMap, this.taskStatusListener, TargetState.STARTED, runnable, runnable2);
        assertStatistics(this.worker, 0, 1);
        Assert.assertEquals(Collections.singleton(TASK_ID), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyKafkaClusterId();
        verifyTaskIsolation(this.task);
        verifyTaskConverter("key.converter");
        verifyTaskConverter("value.converter");
        verifyTaskHeaderConverter();
        verifyExecutorSubmit();
    }

    @Test
    public void testTaskStatusMetricsStatuses() {
        mockInternalConverters();
        mockStorage();
        mockFileConfigProvider();
        Map singletonMap = Collections.singletonMap("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(singletonMap);
        mockKafkaClusterId();
        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, this.task);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        Mockito.when(this.herder.taskStatus(TASK_ID)).thenReturn(new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg"), new ConnectorStateInfo.TaskState[]{new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg"), new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg"), new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", "msg"), new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", "msg")});
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), singletonMap, this.taskStatusListener, TargetState.STARTED);
        assertStatusMetrics(1L, "connector-running-task-count");
        assertStatusMetrics(1L, "connector-paused-task-count");
        assertStatusMetrics(1L, "connector-failed-task-count");
        assertStatusMetrics(1L, "connector-destroyed-task-count");
        assertStatusMetrics(1L, "connector-unassigned-task-count");
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatusMetrics(0L, "connector-running-task-count");
        assertStatusMetrics(0L, "connector-paused-task-count");
        assertStatusMetrics(0L, "connector-failed-task-count");
        assertStatusMetrics(0L, "connector-destroyed-task-count");
        assertStatusMetrics(0L, "connector-unassigned-task-count");
        WorkerSourceTask workerSourceTask = (WorkerSourceTask) this.sourceTaskMockedConstruction.constructed().get(0);
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).initialize(taskConfig);
        ((Herder) Mockito.verify(this.herder, Mockito.times(5))).taskStatus(TASK_ID);
        verifyKafkaClusterId();
        verifyTaskIsolation(this.task);
        verifyExecutorSubmit();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask, Mockito.atLeastOnce())).id();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).awaitStop(ArgumentMatchers.anyLong());
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).removeMetrics();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).loader();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).stop();
        verifyTaskConverter("key.converter");
        verifyTaskConverter("value.converter");
        verifyTaskHeaderConverter();
    }

    @Test
    public void testConnectorStatusMetricsGroup_taskStatusCounter() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(new ConnectorTaskId("c1", 0), Mockito.mock(WorkerSourceTask.class));
        concurrentHashMap.put(new ConnectorTaskId("c1", 1), Mockito.mock(WorkerSourceTask.class));
        concurrentHashMap.put(new ConnectorTaskId("c2", 0), Mockito.mock(WorkerSourceTask.class));
        mockKafkaClusterId();
        mockInternalConverters();
        mockFileConfigProvider();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        Worker.ConnectorStatusMetricsGroup connectorStatusMetricsGroup = new Worker.ConnectorStatusMetricsGroup(this.worker.metrics(), concurrentHashMap, this.herder);
        Assert.assertEquals(2L, ((Long) connectorStatusMetricsGroup.taskCounter("c1").metricValue(0L)).longValue());
        Assert.assertEquals(1L, ((Long) connectorStatusMetricsGroup.taskCounter("c2").metricValue(0L)).longValue());
        Assert.assertEquals(0L, ((Long) connectorStatusMetricsGroup.taskCounter("fakeConnector").metricValue(0L)).longValue());
        verifyKafkaClusterId();
    }

    @Test
    public void testStartTaskFailure() {
        mockInternalConverters();
        mockFileConfigProvider();
        Map singletonMap = Collections.singletonMap("task.class", "missing.From.This.Workers.Classpath");
        mockKafkaClusterId();
        mockGenericIsolation();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 0, 0);
        Assert.assertFalse(this.worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), singletonMap, this.taskStatusListener, TargetState.STARTED));
        assertStartupStatistics(this.worker, 0, 0, 1, 1);
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 1, 1);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        ((TaskStatus.Listener) Mockito.verify(this.taskStatusListener)).onFailure((ConnectorTaskId) ArgumentMatchers.eq(TASK_ID), (Throwable) ArgumentMatchers.any(ConfigException.class));
        verifyKafkaClusterId();
        verifyGenericIsolation();
    }

    @Test
    public void testCleanupTasksOnStop() {
        mockInternalConverters();
        mockStorage();
        mockFileConfigProvider();
        mockKafkaClusterId();
        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, this.task);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", null);
        mockTaskConverter(Plugins.ClassLoaderUsage.PLUGINS, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", null);
        mockTaskConverter(Plugins.ClassLoaderUsage.PLUGINS, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.PLUGINS, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        Map singletonMap = Collections.singletonMap("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(singletonMap);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        this.worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), singletonMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyStorage();
        WorkerSourceTask workerSourceTask = (WorkerSourceTask) this.sourceTaskMockedConstruction.constructed().get(0);
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).initialize(taskConfig);
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).loader();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).stop();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).awaitStop(ArgumentMatchers.anyLong());
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).removeMetrics();
        verifyKafkaClusterId();
        verifyTaskIsolation(this.task);
        verifyConverters();
        verifyExecutorSubmit();
    }

    @Test
    public void testConverterOverrides() {
        mockInternalConverters();
        mockStorage();
        mockFileConfigProvider();
        Map singletonMap = Collections.singletonMap("task.class", TestSourceTask.class.getName());
        TaskConfig taskConfig = new TaskConfig(singletonMap);
        mockKafkaClusterId();
        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, this.task);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", null);
        mockTaskConverter(Plugins.ClassLoaderUsage.PLUGINS, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", null);
        mockTaskConverter(Plugins.ClassLoaderUsage.PLUGINS, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, null);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.PLUGINS, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("key.converter", SampleConverterWithHeaders.class.getName());
        anyConnectorConfigMap.put("value.converter", SampleConverterWithHeaders.class.getName());
        this.worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap, singletonMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        Assert.assertEquals(Collections.singleton(TASK_ID), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        WorkerSourceTask workerSourceTask = (WorkerSourceTask) this.sourceTaskMockedConstruction.constructed().get(0);
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).initialize(taskConfig);
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).stop();
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).awaitStop(ArgumentMatchers.anyLong());
        ((WorkerSourceTask) Mockito.verify(workerSourceTask)).removeMetrics();
        verifyKafkaClusterId();
        verifyTaskIsolation(this.task);
        verifyExecutorSubmit();
        verifyStorage();
    }

    @Test
    public void testProducerConfigsWithoutOverrides() {
        Mockito.when(this.connectorConfig.originalsWithPrefix("producer.override.")).thenReturn(new HashMap());
        HashMap hashMap = new HashMap(this.defaultProducerConfigs);
        hashMap.put("client.id", "connector-producer-job-0");
        hashMap.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap.put("metrics.context.resource.type", "connect");
        hashMap.put("metrics.context.resource.connector", TASK_ID.connector());
        hashMap.put("metrics.context.resource.task", Integer.valueOf(TASK_ID.task()));
        hashMap.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        Assert.assertEquals(hashMap, Worker.baseProducerConfigs(TASK_ID, "connector-producer-" + TASK_ID, this.config, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("producer.override.");
    }

    @Test
    public void testProducerConfigsWithOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("producer.acks", "-1");
        hashMap.put("producer.linger.ms", "1000");
        hashMap.put("producer.client.id", "producer-test-id");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultProducerConfigs);
        hashMap2.put("acks", "-1");
        hashMap2.put("linger.ms", "1000");
        hashMap2.put("client.id", "producer-test-id");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", TASK_ID.connector());
        hashMap2.put("metrics.context.resource.task", Integer.valueOf(TASK_ID.task()));
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        Mockito.when(this.connectorConfig.originalsWithPrefix("producer.override.")).thenReturn(new HashMap());
        Assert.assertEquals(hashMap2, Worker.baseProducerConfigs(TASK_ID, "connector-producer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("producer.override.");
    }

    @Test
    public void testProducerConfigsWithClientOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("producer.acks", "-1");
        hashMap.put("producer.linger.ms", "1000");
        hashMap.put("producer.client.id", "producer-test-id");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultProducerConfigs);
        hashMap2.put("acks", "-1");
        hashMap2.put("linger.ms", "5000");
        hashMap2.put("batch.size", "1000");
        hashMap2.put("client.id", "producer-test-id");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", TASK_ID.connector());
        hashMap2.put("metrics.context.resource.task", Integer.valueOf(TASK_ID.task()));
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        HashMap hashMap3 = new HashMap();
        hashMap3.put("linger.ms", "5000");
        hashMap3.put("batch.size", "1000");
        Mockito.when(this.connectorConfig.originalsWithPrefix("producer.override.")).thenReturn(hashMap3);
        Assert.assertEquals(hashMap2, Worker.baseProducerConfigs(TASK_ID, "connector-producer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("producer.override.");
    }

    @Test
    public void testProducerConfigsWithTelemetryProperty() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("confluent.telemetry.metrics.collector.include", ".*");
        hashMap.put("confluent.telemetry.labels.connect.physical_cluster_id", "connect-foo");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultProducerConfigs);
        hashMap2.put("client.id", "connector-producer-job-0");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", TASK_ID.connector());
        hashMap2.put("metrics.context.resource.task", Integer.valueOf(TASK_ID.task()));
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        hashMap2.put("confluent.telemetry.labels.connect.physical_cluster_id", "connect-foo");
        hashMap2.put("confluent.telemetry.metrics.collector.include", ".*");
        Mockito.when(this.connectorConfig.originalsWithPrefix("producer.override.")).thenReturn(new HashMap());
        Assert.assertEquals(hashMap2, Worker.baseProducerConfigs(TASK_ID, "connector-producer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("producer.override.");
    }

    @Test
    public void testConsumerConfigsWithoutOverrides() {
        HashMap hashMap = new HashMap(this.defaultConsumerConfigs);
        hashMap.put("group.id", "connect-test");
        hashMap.put("client.id", "connector-consumer-test");
        hashMap.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap.put("metrics.context.resource.type", "connect");
        hashMap.put("metrics.context.resource.connector", "test");
        hashMap.put("metrics.context.resource.task", 1);
        hashMap.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(new HashMap());
        Assert.assertEquals(hashMap, Worker.baseConsumerConfigs(new ConnectorTaskId("test", 1), "connector-consumer-test", this.config, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("consumer.override.");
    }

    @Test
    public void testConsumerConfigsWithOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("consumer.auto.offset.reset", "latest");
        hashMap.put("consumer.max.poll.records", "1000");
        hashMap.put("consumer.client.id", "consumer-test-id");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultConsumerConfigs);
        hashMap2.put("group.id", "connect-test");
        hashMap2.put("auto.offset.reset", "latest");
        hashMap2.put("max.poll.records", "1000");
        hashMap2.put("client.id", "consumer-test-id");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", "test");
        hashMap2.put("metrics.context.resource.task", 1);
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(new HashMap());
        Assert.assertEquals(hashMap2, Worker.baseConsumerConfigs(new ConnectorTaskId("test", 1), "connector-consumer-test", standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("consumer.override.");
    }

    @Test
    public void testConsumerConfigsWithClientOverrides() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("consumer.auto.offset.reset", "latest");
        hashMap.put("consumer.max.poll.records", "5000");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultConsumerConfigs);
        hashMap2.put("group.id", "connect-job");
        hashMap2.put("auto.offset.reset", "latest");
        hashMap2.put("max.poll.records", "5000");
        hashMap2.put("max.poll.interval.ms", "1000");
        hashMap2.put("client.id", "connector-consumer-job-0");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", TASK_ID.connector());
        hashMap2.put("metrics.context.resource.task", Integer.valueOf(TASK_ID.task()));
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        HashMap hashMap3 = new HashMap();
        hashMap3.put("max.poll.records", "5000");
        hashMap3.put("max.poll.interval.ms", "1000");
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(hashMap3);
        Assert.assertEquals(hashMap2, Worker.baseConsumerConfigs(TASK_ID, "connector-consumer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("consumer.override.");
    }

    @Test
    public void testConsumerConfigsClientOverridesWithNonePolicy() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("consumer.auto.offset.reset", "latest");
        hashMap.put("consumer.max.poll.records", "5000");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("max.poll.records", "5000");
        hashMap2.put("max.poll.interval.ms", "1000");
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(hashMap2);
        Assert.assertThrows(ConnectException.class, () -> {
            Worker.baseConsumerConfigs(CONNECTOR_ID, "connector-consumer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK);
        });
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("consumer.override.");
    }

    @Test
    public void testConsumerConfigsWithoutTelemetryProperty() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("confluent.telemetry.metrics.collector.include", ".*");
        hashMap.put("confluent.telemetry.labels.connect.physical_cluster_id", "connect-foo");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap(this.defaultConsumerConfigs);
        hashMap2.put("group.id", "connect-" + TASK_ID.connector());
        hashMap2.put("client.id", "connector-consumer-job-0");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", "job");
        hashMap2.put("metrics.context.resource.task", Integer.valueOf(TASK_ID.task()));
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        hashMap2.put("confluent.telemetry.labels.connect.physical_cluster_id", "connect-foo");
        hashMap2.put("confluent.telemetry.metrics.collector.include", ".*");
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(new HashMap());
        Assert.assertEquals(hashMap2, Worker.baseConsumerConfigs(TASK_ID, "connector-consumer-" + TASK_ID, standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("consumer.override.");
    }

    @Test
    public void testAdminConfigsClientOverridesWithAllPolicy() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("admin.client.id", "testid");
        hashMap.put("admin.metadata.max.age.ms", "5000");
        hashMap.put("producer.bootstrap.servers", "localhost:1234");
        hashMap.put("consumer.bootstrap.servers", "localhost:4761");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        Map singletonMap = Collections.singletonMap("metadata.max.age.ms", "10000");
        HashMap hashMap2 = new HashMap(this.workerProps);
        hashMap2.put("bootstrap.servers", "localhost:9092");
        hashMap2.put("client.id", "testid");
        hashMap2.put("metadata.max.age.ms", "10000");
        hashMap2.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap2.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap2.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap2.put("metrics.context.resource.type", "connect");
        hashMap2.put("metrics.context.resource.connector", "test");
        hashMap2.put("metrics.context.resource.task", 1);
        hashMap2.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        Mockito.when(this.connectorConfig.originalsWithPrefix("admin.override.")).thenReturn(singletonMap);
        Assert.assertEquals(hashMap2, Worker.adminConfigs(new ConnectorTaskId("test", 1), "", standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SOURCE));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("admin.override.");
    }

    @Test
    public void testAdminConfigsClientOverridesWithNonePolicy() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("admin.client.id", "testid");
        hashMap.put("admin.metadata.max.age.ms", "5000");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        Mockito.when(this.connectorConfig.originalsWithPrefix("admin.override.")).thenReturn(Collections.singletonMap("metadata.max.age.ms", "10000"));
        Assert.assertThrows(ConnectException.class, () -> {
            Worker.adminConfigs(TASK_ID, "", standaloneConfig, this.connectorConfig, (Class) null, this.noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SOURCE);
        });
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("admin.override.");
    }

    @Test
    public void testAdminConfigsClientWithTelemetryPropterty() {
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("admin.client.id", "testid");
        hashMap.put("admin.metadata.max.age.ms", "5000");
        hashMap.put("producer.bootstrap.servers", "cbeauho.com");
        hashMap.put("consumer.bootstrap.servers", "localhost:4761");
        hashMap.put("confluent.telemetry.metrics.collector.include", ".*");
        hashMap.put("confluent.telemetry.labels.connect.physical_cluster_id", "connect-foo");
        StandaloneConfig standaloneConfig = new StandaloneConfig(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("metadata.max.age.ms", "10000");
        HashMap hashMap3 = new HashMap(this.workerProps);
        hashMap3.put("bootstrap.servers", "localhost:9092");
        hashMap3.put("client.id", "testid");
        hashMap3.put("metadata.max.age.ms", "10000");
        hashMap3.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
        hashMap3.put("metrics.context.resource.version", AppInfoParser.getVersion());
        hashMap3.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        hashMap3.put("metrics.context.resource.type", "connect");
        hashMap3.put("metrics.context.resource.connector", "test");
        hashMap3.put("metrics.context.resource.task", 1);
        hashMap3.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        hashMap3.put("confluent.telemetry.labels.connect.physical_cluster_id", "connect-foo");
        hashMap3.put("confluent.telemetry.metrics.collector.include", ".*");
        Mockito.when(this.connectorConfig.originalsWithPrefix("admin.override.")).thenReturn(hashMap2);
        Assert.assertEquals(hashMap3, Worker.adminConfigs(new ConnectorTaskId("test", 1), "", standaloneConfig, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SOURCE));
        ((ConnectorConfig) Mockito.verify(this.connectorConfig)).originalsWithPrefix("admin.override.");
    }

    @Test
    public void testRegularSourceOffsetsConsumerConfigs() {
        HashMap hashMap = new HashMap();
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(hashMap);
        HashMap hashMap2 = new HashMap(this.workerProps);
        hashMap2.put("exactly.once.source.support", "enabled");
        hashMap2.put("bootstrap.servers", "localhost:4761");
        hashMap2.put("group.id", "connect-cluster");
        hashMap2.put("config.storage.topic", "connect-configs");
        hashMap2.put("offset.storage.topic", "connect-offsets");
        hashMap2.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap2);
        Map regularSourceOffsetsConsumerConfigs = Worker.regularSourceOffsetsConsumerConfigs("test", "", this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:4761", regularSourceOffsetsConsumerConfigs.get("bootstrap.servers"));
        Assert.assertEquals("read_committed", regularSourceOffsetsConsumerConfigs.get("isolation.level"));
        hashMap2.put("consumer.bootstrap.servers", "localhost:9021");
        hashMap2.put("consumer.isolation.level", "read_uncommitted");
        this.config = new DistributedConfig(hashMap2);
        Map regularSourceOffsetsConsumerConfigs2 = Worker.regularSourceOffsetsConsumerConfigs("test", "", this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:9021", regularSourceOffsetsConsumerConfigs2.get("bootstrap.servers"));
        Assert.assertEquals("read_uncommitted", regularSourceOffsetsConsumerConfigs2.get("isolation.level"));
        hashMap2.remove("consumer.isolation.level");
        hashMap.put("bootstrap.servers", "localhost:489");
        hashMap.put("isolation.level", "read_uncommitted");
        this.config = new DistributedConfig(hashMap2);
        Map regularSourceOffsetsConsumerConfigs3 = Worker.regularSourceOffsetsConsumerConfigs("test", "", this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:489", regularSourceOffsetsConsumerConfigs3.get("bootstrap.servers"));
        Assert.assertEquals("read_uncommitted", regularSourceOffsetsConsumerConfigs3.get("isolation.level"));
    }

    @Test
    public void testExactlyOnceSourceOffsetsConsumerConfigs() {
        HashMap hashMap = new HashMap();
        Mockito.when(this.connectorConfig.originalsWithPrefix("consumer.override.")).thenReturn(hashMap);
        HashMap hashMap2 = new HashMap(this.workerProps);
        hashMap2.put("exactly.once.source.support", "enabled");
        hashMap2.put("bootstrap.servers", "localhost:4761");
        hashMap2.put("group.id", "connect-cluster");
        hashMap2.put("config.storage.topic", "connect-configs");
        hashMap2.put("offset.storage.topic", "connect-offsets");
        hashMap2.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap2);
        Map exactlyOnceSourceOffsetsConsumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs("test", "", this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:4761", exactlyOnceSourceOffsetsConsumerConfigs.get("bootstrap.servers"));
        Assert.assertEquals("read_committed", exactlyOnceSourceOffsetsConsumerConfigs.get("isolation.level"));
        hashMap2.put("consumer.bootstrap.servers", "localhost:9021");
        hashMap2.put("consumer.isolation.level", "read_uncommitted");
        this.config = new DistributedConfig(hashMap2);
        Map exactlyOnceSourceOffsetsConsumerConfigs2 = Worker.exactlyOnceSourceOffsetsConsumerConfigs("test", "", this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:9021", exactlyOnceSourceOffsetsConsumerConfigs2.get("bootstrap.servers"));
        Assert.assertEquals("read_committed", exactlyOnceSourceOffsetsConsumerConfigs2.get("isolation.level"));
        hashMap2.remove("consumer.isolation.level");
        hashMap.put("bootstrap.servers", "localhost:489");
        hashMap.put("isolation.level", "read_uncommitted");
        this.config = new DistributedConfig(hashMap2);
        Map exactlyOnceSourceOffsetsConsumerConfigs3 = Worker.exactlyOnceSourceOffsetsConsumerConfigs("test", "", this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:489", exactlyOnceSourceOffsetsConsumerConfigs3.get("bootstrap.servers"));
        Assert.assertEquals("read_committed", exactlyOnceSourceOffsetsConsumerConfigs3.get("isolation.level"));
    }

    @Test
    public void testExactlyOnceSourceTaskProducerConfigs() {
        HashMap hashMap = new HashMap();
        Mockito.when(this.connectorConfig.originalsWithPrefix("producer.override.")).thenReturn(hashMap);
        String taskTransactionalId = Worker.taskTransactionalId("connect-cluster", TASK_ID.connector(), TASK_ID.task());
        HashMap hashMap2 = new HashMap(this.workerProps);
        hashMap2.put("exactly.once.source.support", "enabled");
        hashMap2.put("bootstrap.servers", "localhost:4761");
        hashMap2.put("group.id", "connect-cluster");
        hashMap2.put("config.storage.topic", "connect-configs");
        hashMap2.put("offset.storage.topic", "connect-offsets");
        hashMap2.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap2);
        Map exactlyOnceSourceTaskProducerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs(TASK_ID, this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:4761", exactlyOnceSourceTaskProducerConfigs.get("bootstrap.servers"));
        Assert.assertEquals("true", exactlyOnceSourceTaskProducerConfigs.get("enable.idempotence"));
        Assert.assertEquals(taskTransactionalId, exactlyOnceSourceTaskProducerConfigs.get("transactional.id"));
        hashMap2.put("producer.bootstrap.servers", "localhost:9021");
        hashMap2.put("producer.enable.idempotence", "false");
        hashMap2.put("producer.transactional.id", "some-other-transactional-id");
        this.config = new DistributedConfig(hashMap2);
        Map exactlyOnceSourceTaskProducerConfigs2 = Worker.exactlyOnceSourceTaskProducerConfigs(TASK_ID, this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:9021", exactlyOnceSourceTaskProducerConfigs2.get("bootstrap.servers"));
        Assert.assertEquals("true", exactlyOnceSourceTaskProducerConfigs2.get("enable.idempotence"));
        Assert.assertEquals(taskTransactionalId, exactlyOnceSourceTaskProducerConfigs2.get("transactional.id"));
        hashMap2.remove("producer.enable.idempotence");
        hashMap2.remove("producer.transactional.id");
        hashMap.put("bootstrap.servers", "localhost:489");
        hashMap.put("enable.idempotence", "false");
        hashMap.put("transactional.id", "yet-another-transactional-id");
        this.config = new DistributedConfig(hashMap2);
        Map exactlyOnceSourceTaskProducerConfigs3 = Worker.exactlyOnceSourceTaskProducerConfigs(TASK_ID, this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("localhost:489", exactlyOnceSourceTaskProducerConfigs3.get("bootstrap.servers"));
        Assert.assertEquals("true", exactlyOnceSourceTaskProducerConfigs3.get("enable.idempotence"));
        Assert.assertEquals(taskTransactionalId, exactlyOnceSourceTaskProducerConfigs3.get("transactional.id"));
        hashMap.put("transactional.id", null);
        Map exactlyOnceSourceTaskProducerConfigs4 = Worker.exactlyOnceSourceTaskProducerConfigs(TASK_ID, this.config, this.connectorConfig, (Class) null, this.allConnectorClientConfigOverridePolicy, CLUSTER_ID);
        Assert.assertEquals("true", exactlyOnceSourceTaskProducerConfigs4.get("enable.idempotence"));
        Assert.assertEquals(taskTransactionalId, exactlyOnceSourceTaskProducerConfigs4.get("transactional.id"));
    }

    @Test
    public void testOffsetStoreForRegularSourceConnector() {
        mockInternalConverters();
        mockFileConfigProvider();
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("exactly.once.source.support", "disabled");
        hashMap.put("bootstrap.servers", "localhost:4761");
        hashMap.put("group.id", "connect-cluster");
        hashMap.put("config.storage.topic", "connect-configs");
        hashMap.put("offset.storage.topic", "worker-offsets");
        hashMap.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap);
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.allConnectorClientConfigOverridePolicy);
        this.worker.start();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", CONNECTOR_ID);
        hashMap2.put("connector.class", SampleSourceConnector.class.getName());
        hashMap2.put("tasks.max", "1");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector = this.worker.offsetStoreForRegularSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForRegularSourceConnector.hasWorkerGlobalStore());
        Assert.assertFalse(offsetStoreForRegularSourceConnector.hasConnectorSpecificStore());
        hashMap2.put("offsets.storage.topic", "connector-offsets-topic");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector2 = this.worker.offsetStoreForRegularSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForRegularSourceConnector2.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceConnector2.hasConnectorSpecificStore());
        hashMap2.put("offsets.storage.topic", "worker-offsets");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector3 = this.worker.offsetStoreForRegularSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertFalse(offsetStoreForRegularSourceConnector3.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceConnector3.hasConnectorSpecificStore());
        hashMap2.put("producer.override.bootstrap.servers", "localhost:4761");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector4 = this.worker.offsetStoreForRegularSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertFalse(offsetStoreForRegularSourceConnector4.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceConnector4.hasConnectorSpecificStore());
        hashMap2.put("producer.override.bootstrap.servers", "localhost:1111");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector5 = this.worker.offsetStoreForRegularSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForRegularSourceConnector5.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceConnector5.hasConnectorSpecificStore());
        hashMap2.remove("offsets.storage.topic");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector6 = this.worker.offsetStoreForRegularSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForRegularSourceConnector6.hasWorkerGlobalStore());
        Assert.assertFalse(offsetStoreForRegularSourceConnector6.hasConnectorSpecificStore());
        this.worker.stop();
        verifyKafkaClusterId();
    }

    @Test
    public void testOffsetStoreForExactlyOnceSourceConnector() {
        mockInternalConverters();
        mockFileConfigProvider();
        HashMap hashMap = new HashMap(this.workerProps);
        hashMap.put("exactly.once.source.support", "enabled");
        hashMap.put("bootstrap.servers", "localhost:4761");
        hashMap.put("group.id", "connect-cluster");
        hashMap.put("config.storage.topic", "connect-configs");
        hashMap.put("offset.storage.topic", "worker-offsets");
        hashMap.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap);
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.allConnectorClientConfigOverridePolicy);
        this.worker.start();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", CONNECTOR_ID);
        hashMap2.put("connector.class", SampleSourceConnector.class.getName());
        hashMap2.put("tasks.max", "1");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector = this.worker.offsetStoreForExactlyOnceSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertFalse(offsetStoreForExactlyOnceSourceConnector.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector.hasConnectorSpecificStore());
        hashMap2.put("offsets.storage.topic", "connector-offsets-topic");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector2 = this.worker.offsetStoreForExactlyOnceSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector2.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector2.hasConnectorSpecificStore());
        hashMap2.put("offsets.storage.topic", "worker-offsets");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector3 = this.worker.offsetStoreForExactlyOnceSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertFalse(offsetStoreForExactlyOnceSourceConnector3.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector3.hasConnectorSpecificStore());
        hashMap2.put("producer.override.bootstrap.servers", "localhost:4761");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector4 = this.worker.offsetStoreForExactlyOnceSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertFalse(offsetStoreForExactlyOnceSourceConnector4.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector4.hasConnectorSpecificStore());
        hashMap2.put("producer.override.bootstrap.servers", "localhost:1111");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector5 = this.worker.offsetStoreForExactlyOnceSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector5.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector5.hasConnectorSpecificStore());
        hashMap2.remove("offsets.storage.topic");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector6 = this.worker.offsetStoreForExactlyOnceSourceConnector(new SourceConnectorConfig(this.plugins, hashMap2, this.enableTopicCreation), CONNECTOR_ID, this.sourceConnector);
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector6.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceConnector6.hasConnectorSpecificStore());
        this.worker.stop();
        verifyKafkaClusterId();
    }

    @Test
    public void testOffsetStoreForRegularSourceTask() {
        mockInternalConverters();
        mockFileConfigProvider();
        HashMap hashMap = new HashMap();
        Producer producer = (Producer) Mockito.mock(Producer.class);
        TopicAdmin topicAdmin = (TopicAdmin) Mockito.mock(TopicAdmin.class);
        HashMap hashMap2 = new HashMap(this.workerProps);
        hashMap2.put("exactly.once.source.support", "disabled");
        hashMap2.put("bootstrap.servers", "localhost:4761");
        hashMap2.put("group.id", "connect-cluster");
        hashMap2.put("config.storage.topic", "connect-configs");
        hashMap2.put("offset.storage.topic", "worker-offsets");
        hashMap2.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap2);
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.allConnectorClientConfigOverridePolicy);
        this.worker.start();
        HashMap hashMap3 = new HashMap();
        hashMap3.put("name", CONNECTOR_ID);
        hashMap3.put("connector.class", SampleSourceConnector.class.getName());
        hashMap3.put("tasks.max", "1");
        SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation);
        hashMap.put("bootstrap.servers", "localhost:4761");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceTask = this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig, this.sourceConnector.getClass(), producer, hashMap, (TopicAdmin) null);
        Assert.assertTrue(offsetStoreForRegularSourceTask.hasWorkerGlobalStore());
        Assert.assertFalse(offsetStoreForRegularSourceTask.hasConnectorSpecificStore());
        hashMap3.put("offsets.storage.topic", "connector-offsets-topic");
        SourceConnectorConfig sourceConnectorConfig2 = new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation);
        ConnectorOffsetBackingStore offsetStoreForRegularSourceTask2 = this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig2, this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertTrue(offsetStoreForRegularSourceTask2.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceTask2.hasConnectorSpecificStore());
        Assert.assertThrows(NullPointerException.class, () -> {
            this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig2, this.sourceConnector.getClass(), producer, hashMap, (TopicAdmin) null);
        });
        hashMap3.put("offsets.storage.topic", "worker-offsets");
        SourceConnectorConfig sourceConnectorConfig3 = new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation);
        ConnectorOffsetBackingStore offsetStoreForRegularSourceTask3 = this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig3, this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertFalse(offsetStoreForRegularSourceTask3.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceTask3.hasConnectorSpecificStore());
        Assert.assertThrows(NullPointerException.class, () -> {
            this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig3, this.sourceConnector.getClass(), producer, hashMap, (TopicAdmin) null);
        });
        hashMap.put("bootstrap.servers", "localhost:4761");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceTask4 = this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig3, this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertFalse(offsetStoreForRegularSourceTask4.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceTask4.hasConnectorSpecificStore());
        Assert.assertThrows(NullPointerException.class, () -> {
            this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig3, this.sourceConnector.getClass(), producer, hashMap, (TopicAdmin) null);
        });
        hashMap.put("bootstrap.servers", "localhost:1111");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceTask5 = this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig3, this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertTrue(offsetStoreForRegularSourceTask5.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForRegularSourceTask5.hasConnectorSpecificStore());
        Assert.assertThrows(NullPointerException.class, () -> {
            this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig3, this.sourceConnector.getClass(), producer, hashMap, (TopicAdmin) null);
        });
        hashMap3.remove("offsets.storage.topic");
        ConnectorOffsetBackingStore offsetStoreForRegularSourceTask6 = this.worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConnectorConfig, this.sourceConnector.getClass(), producer, hashMap, (TopicAdmin) null);
        Assert.assertTrue(offsetStoreForRegularSourceTask6.hasWorkerGlobalStore());
        Assert.assertFalse(offsetStoreForRegularSourceTask6.hasConnectorSpecificStore());
        this.worker.stop();
        verifyKafkaClusterId();
    }

    @Test
    public void testOffsetStoreForExactlyOnceSourceTask() {
        mockInternalConverters();
        mockFileConfigProvider();
        HashMap hashMap = new HashMap();
        Producer producer = (Producer) Mockito.mock(Producer.class);
        TopicAdmin topicAdmin = (TopicAdmin) Mockito.mock(TopicAdmin.class);
        HashMap hashMap2 = new HashMap(this.workerProps);
        hashMap2.put("exactly.once.source.support", "enabled");
        hashMap2.put("bootstrap.servers", "localhost:4761");
        hashMap2.put("group.id", "connect-cluster");
        hashMap2.put("config.storage.topic", "connect-configs");
        hashMap2.put("offset.storage.topic", "worker-offsets");
        hashMap2.put("status.storage.topic", "connect-statuses");
        this.config = new DistributedConfig(hashMap2);
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.allConnectorClientConfigOverridePolicy);
        this.worker.start();
        HashMap hashMap3 = new HashMap();
        hashMap3.put("name", CONNECTOR_ID);
        hashMap3.put("connector.class", SampleSourceConnector.class.getName());
        hashMap3.put("tasks.max", "1");
        SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation);
        hashMap.put("bootstrap.servers", "localhost:4761");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask = this.worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConnectorConfig, this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertFalse(offsetStoreForExactlyOnceSourceTask.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask.hasConnectorSpecificStore());
        hashMap3.put("offsets.storage.topic", "connector-offsets-topic");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask2 = this.worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation), this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask2.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask2.hasConnectorSpecificStore());
        hashMap3.put("offsets.storage.topic", "worker-offsets");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask3 = this.worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation), this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertFalse(offsetStoreForExactlyOnceSourceTask3.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask3.hasConnectorSpecificStore());
        hashMap.put("bootstrap.servers", "localhost:4761");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask4 = this.worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation), this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertFalse(offsetStoreForExactlyOnceSourceTask4.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask4.hasConnectorSpecificStore());
        hashMap.put("bootstrap.servers", "localhost:1111");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask5 = this.worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation), this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask5.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask5.hasConnectorSpecificStore());
        hashMap3.remove("offsets.storage.topic");
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask6 = this.worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, new SourceConnectorConfig(this.plugins, hashMap3, this.enableTopicCreation), this.sourceConnector.getClass(), producer, hashMap, topicAdmin);
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask6.hasWorkerGlobalStore());
        Assert.assertTrue(offsetStoreForExactlyOnceSourceTask6.hasConnectorSpecificStore());
        this.worker.stop();
        verifyKafkaClusterId();
    }

    @Test
    public void testWorkerMetrics() throws Exception {
        mockKafkaClusterId();
        mockInternalConverters();
        mockFileConfigProvider();
        Worker worker = new Worker("worker-1", Time.SYSTEM, this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        worker.metrics().metrics().addMetric(worker.metrics().metrics().metricName("test.avg", "grp1"), new Avg());
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        for (MetricsReporter metricsReporter : worker.metrics().metrics().reporters()) {
            if (metricsReporter instanceof MockConnectMetrics.MockMetricsReporter) {
                Assert.assertEquals(CLUSTER_ID, ((MockConnectMetrics.MockMetricsReporter) metricsReporter).getMetricsContext().contextLabels().get("connect.kafka.cluster.id"));
            }
        }
        Assert.assertNotNull(platformMBeanServer.getObjectInstance(new ObjectName("kafka.connect:type=grp1")));
        verifyKafkaClusterId();
    }

    @Test
    public void testExecutorServiceShutdown() throws InterruptedException {
        mockKafkaClusterId();
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        ((ExecutorService) Mockito.doNothing().when(executorService)).shutdown();
        Mockito.when(Boolean.valueOf(executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS))).thenReturn(true);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        verifyKafkaClusterId();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).shutdown();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).awaitTermination(1000L, TimeUnit.MILLISECONDS);
        Mockito.verifyNoMoreInteractions(new Object[]{executorService});
    }

    @Test
    public void testExecutorServiceShutdownWhenTerminationFails() throws InterruptedException {
        mockKafkaClusterId();
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        ((ExecutorService) Mockito.doNothing().when(executorService)).shutdown();
        Mockito.when(Boolean.valueOf(executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS))).thenReturn(false);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        verifyKafkaClusterId();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).shutdown();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).shutdownNow();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(2))).awaitTermination(1000L, TimeUnit.MILLISECONDS);
        Mockito.verifyNoMoreInteractions(new Object[]{executorService});
    }

    @Test
    public void testExecutorServiceShutdownWhenTerminationThrowsException() throws InterruptedException {
        mockKafkaClusterId();
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        ((ExecutorService) Mockito.doNothing().when(executorService)).shutdown();
        Mockito.when(Boolean.valueOf(executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS))).thenThrow(new Throwable[]{new InterruptedException("interrupt")});
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        verifyKafkaClusterId();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).shutdown();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).shutdownNow();
        ((ExecutorService) Mockito.verify(executorService, Mockito.times(1))).awaitTermination(1000L, TimeUnit.MILLISECONDS);
        Mockito.verifyNoMoreInteractions(new Object[]{executorService});
    }

    @Test
    public void testZombieFencing() {
        Admin admin = (Admin) Mockito.mock(Admin.class);
        FenceProducersResult fenceProducersResult = (FenceProducersResult) Mockito.mock(FenceProducersResult.class);
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(admin.fenceProducers((Collection) ArgumentMatchers.any(), (FenceProducersOptions) ArgumentMatchers.any())).thenReturn(fenceProducersResult);
        Mockito.when(fenceProducersResult.all()).thenReturn(kafkaFuture);
        Mockito.when(kafkaFuture.whenComplete((KafkaFuture.BiConsumer) ArgumentMatchers.any())).thenReturn(kafkaFuture2);
        mockKafkaClusterId();
        mockGenericIsolation();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.allConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("admin.override.retry.backoff.ms", "4761");
        AtomicReference atomicReference = new AtomicReference();
        Assert.assertEquals(kafkaFuture2, this.worker.fenceZombies(CONNECTOR_ID, 12, anyConnectorConfigMap, map -> {
            atomicReference.set(map);
            return admin;
        }));
        Assert.assertNotNull(atomicReference.get());
        Assert.assertEquals("Admin should be configured with user-specified overrides", "4761", ((Map) atomicReference.get()).get("retry.backoff.ms"));
        verifyKafkaClusterId();
        verifyGenericIsolation();
    }

    private void assertStatusMetrics(long j, String str) {
        ConnectMetrics.MetricGroup metricGroup = this.worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
        if (j == 0) {
            Assert.assertNull(metricGroup);
        } else {
            Assert.assertEquals(Long.valueOf(j), MockConnectMetrics.currentMetricValue(this.worker.metrics(), metricGroup, str));
        }
    }

    private void assertStatistics(Worker worker, int i, int i2) {
        assertStatusMetrics(i2, "connector-total-task-count");
        ConnectMetrics.MetricGroup metricGroup = worker.workerMetricsGroup().metricGroup();
        Assert.assertEquals(i, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-count"), 1.0E-4d);
        Assert.assertEquals(i2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-count"), 1.0E-4d);
        Assert.assertEquals(i2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-count"), 1.0E-4d);
    }

    private void assertStartupStatistics(Worker worker, int i, int i2, int i3, int i4) {
        double d = i - i2;
        double d2 = i3 - i4;
        double d3 = 0.0d;
        double d4 = 0.0d;
        double d5 = 0.0d;
        double d6 = 0.0d;
        if (i != 0) {
            d3 = d / i;
            d4 = i2 / i;
        }
        if (i3 != 0) {
            d5 = d2 / i3;
            d6 = i4 / i3;
        }
        ConnectMetrics.MetricGroup metricGroup = worker.workerMetricsGroup().metricGroup();
        Assert.assertEquals(i, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-attempts-total"), 1.0E-4d);
        Assert.assertEquals(d, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-success-total"), 1.0E-4d);
        Assert.assertEquals(i2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-failure-total"), 1.0E-4d);
        Assert.assertEquals(d3, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-success-percentage"), 1.0E-4d);
        Assert.assertEquals(d4, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "connector-startup-failure-percentage"), 1.0E-4d);
        Assert.assertEquals(i3, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-attempts-total"), 1.0E-4d);
        Assert.assertEquals(d2, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-success-total"), 1.0E-4d);
        Assert.assertEquals(i4, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-failure-total"), 1.0E-4d);
        Assert.assertEquals(d5, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-success-percentage"), 1.0E-4d);
        Assert.assertEquals(d6, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), metricGroup, "task-startup-failure-percentage"), 1.0E-4d);
    }

    private void mockStorage() {
        Mockito.when(this.herder.statusBackingStore()).thenReturn(this.statusBackingStore);
    }

    private void verifyStorage() {
        ((OffsetBackingStore) Mockito.verify(this.offsetBackingStore)).start();
        ((Herder) Mockito.verify(this.herder)).statusBackingStore();
        ((OffsetBackingStore) Mockito.verify(this.offsetBackingStore)).stop();
    }

    private void mockInternalConverters() {
        Converter converter = (Converter) Mockito.mock(JsonConverter.class);
        Converter converter2 = (Converter) Mockito.mock(JsonConverter.class);
        Mockito.when(this.plugins.newInternalConverter(ArgumentMatchers.eq(true), ArgumentMatchers.anyString(), ArgumentMatchers.anyMap())).thenReturn(converter);
        Mockito.when(this.plugins.newInternalConverter(ArgumentMatchers.eq(false), ArgumentMatchers.anyString(), ArgumentMatchers.anyMap())).thenReturn(converter2);
    }

    private void verifyConverters() {
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).newInternalConverter(ArgumentMatchers.eq(true), ArgumentMatchers.anyString(), ArgumentMatchers.anyMap());
        ((Plugins) Mockito.verify(this.plugins)).newInternalConverter(ArgumentMatchers.eq(false), ArgumentMatchers.anyString(), ArgumentMatchers.anyMap());
    }

    private void mockTaskConverter(Plugins.ClassLoaderUsage classLoaderUsage, String str, Converter converter) {
        Mockito.when(this.plugins.newConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq(str), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(classLoaderUsage))).thenReturn(converter);
    }

    private void verifyTaskConverter(String str) {
        ((Plugins) Mockito.verify(this.plugins)).newConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq(str), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER));
    }

    private void mockTaskHeaderConverter(Plugins.ClassLoaderUsage classLoaderUsage, HeaderConverter headerConverter) {
        Mockito.when(this.plugins.newHeaderConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq("header.converter"), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(classLoaderUsage))).thenReturn(headerConverter);
    }

    private void verifyTaskHeaderConverter() {
        ((Plugins) Mockito.verify(this.plugins)).newHeaderConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq("header.converter"), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER));
    }

    private void mockGenericIsolation() {
        Mockito.when(this.plugins.connectorLoader(ArgumentMatchers.anyString())).thenReturn(this.pluginLoader);
        Mockito.when(this.plugins.withClassLoader(this.pluginLoader)).thenReturn(this.loaderSwap);
    }

    private void verifyGenericIsolation() {
        ((Plugins) Mockito.verify(this.plugins, Mockito.atLeastOnce())).withClassLoader(this.pluginLoader);
        ((LoaderSwap) Mockito.verify(this.loaderSwap, Mockito.atLeastOnce())).close();
    }

    private void mockConnectorIsolation(String str, Connector connector) {
        mockGenericIsolation();
        Mockito.when(this.plugins.newConnector(str)).thenReturn(connector);
        Mockito.when(connector.version()).thenReturn("1.0");
        Mockito.when(connector.config()).thenReturn(SampleSourceConnector.CONFIG);
    }

    private void verifyConnectorIsolation(Connector connector) {
        verifyGenericIsolation();
        ((Plugins) Mockito.verify(this.plugins)).newConnector(ArgumentMatchers.anyString());
        ((Connector) Mockito.verify(connector, Mockito.atLeastOnce())).version();
    }

    private void mockTaskIsolation(Class<? extends Connector> cls, Class<? extends Task> cls2, Task task) {
        mockGenericIsolation();
        ((Plugins) Mockito.doReturn(cls).when(this.plugins)).connectorClass(cls.getName());
        Mockito.when(this.plugins.newTask(cls2)).thenReturn(task);
        Mockito.when(task.version()).thenReturn("1.0");
    }

    private void verifyTaskIsolation(Task task) {
        verifyGenericIsolation();
        ((Plugins) Mockito.verify(this.plugins)).connectorClass(ArgumentMatchers.anyString());
        ((Plugins) Mockito.verify(this.plugins)).newTask((Class) ArgumentMatchers.any());
        ((Task) Mockito.verify(task)).version();
    }

    private void mockExecutorRealSubmit(Class<? extends Runnable> cls) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(cls);
        Mockito.when(this.plugins.withClassLoader((ClassLoader) ArgumentMatchers.same(this.pluginLoader), (Runnable) forClass.capture())).thenReturn(this.isolatedRunnable);
        ((Runnable) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) forClass.getValue()).run();
            return null;
        }).when(this.isolatedRunnable)).run();
    }

    private void mockExecutorFakeSubmit(Class<? extends Runnable> cls) {
        Mockito.when(this.plugins.withClassLoader((ClassLoader) ArgumentMatchers.same(this.pluginLoader), (Runnable) ArgumentMatchers.any(cls))).thenReturn(this.isolatedRunnable);
        ((Runnable) Mockito.doNothing().when(this.isolatedRunnable)).run();
        Mockito.when(this.executorService.submit(this.isolatedRunnable)).thenAnswer(invocationOnMock -> {
            this.isolatedRunnable.run();
            return null;
        });
    }

    private void verifyExecutorSubmit() {
        ((Plugins) Mockito.verify(this.plugins)).withClassLoader((ClassLoader) ArgumentMatchers.same(this.pluginLoader), (Runnable) ArgumentMatchers.any(Runnable.class));
        ((Runnable) Mockito.verify(this.isolatedRunnable)).run();
    }

    private void mockKafkaClusterId() {
        this.config = (WorkerConfig) Mockito.spy(this.config);
        ((WorkerConfig) Mockito.doReturn(CLUSTER_ID).when(this.config)).kafkaClusterId();
    }

    private void verifyKafkaClusterId() {
        ((WorkerConfig) Mockito.verify(this.config, Mockito.atLeastOnce())).kafkaClusterId();
    }

    private Map<String, String> anyConnectorConfigMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", SampleSourceConnector.class.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        return hashMap;
    }

    @Test
    public void testCloseableResourcesTrackedWhileInstantiatingSourceTask() {
        ClusterConfigState clusterConfigState = (ClusterConfigState) Mockito.mock(ClusterConfigState.class);
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("connector.class", WorkerTestSinkConnector.class.getName());
        ConnectorConfig connectorConfig = new ConnectorConfig(this.plugins, anyConnectorConfigMap);
        WorkerMetricsGroup.TaskStatusListener taskStatusListener = (WorkerMetricsGroup.TaskStatusListener) Mockito.mock(WorkerMetricsGroup.TaskStatusListener.class);
        mockKafkaClusterId();
        mockInternalConverters();
        mockStorage();
        mockFileConfigProvider();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        HashMap hashMap = new HashMap();
        this.worker.buildSourceWorkerTask(clusterConfigState, connectorConfig, TASK_ID, this.task, taskStatusListener, TargetState.STARTED, this.taskKeyConverter, this.taskValueConverter, this.taskHeaderConverter, getClass().getClassLoader(), (autoCloseable, str) -> {
        });
        HashMap hashMap2 = new HashMap();
        hashMap2.put(OffsetStorageReaderImpl.class, 1);
        if (this.enableTopicCreation) {
            hashMap2.put(TopicAdmin.class, 1);
        }
        hashMap2.put(RetryWithToleranceOperator.class, 1);
        hashMap2.put(KafkaProducer.class, 1);
        hashMap2.put(TransformationChain.class, 1);
        hashMap2.put(ErrorHandlingMetrics.class, 1);
        hashMap2.put(LogReporter.class, 1);
        Assert.assertEquals(hashMap2, hashMap);
    }

    @Test
    public void testCloseableResourcesTrackedWhileInstantiatingSinkTask() {
        if (this.enableTopicCreation) {
            return;
        }
        ClusterConfigState clusterConfigState = (ClusterConfigState) Mockito.mock(ClusterConfigState.class);
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("connector.class", WorkerTestSinkConnector.class.getName());
        ConnectorConfig connectorConfig = new ConnectorConfig(this.plugins, anyConnectorConfigMap);
        WorkerMetricsGroup.TaskStatusListener taskStatusListener = (WorkerMetricsGroup.TaskStatusListener) Mockito.mock(WorkerMetricsGroup.TaskStatusListener.class);
        TestSinkTask testSinkTask = new TestSinkTask();
        mockKafkaClusterId();
        mockInternalConverters();
        mockStorage();
        mockFileConfigProvider();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        HashMap hashMap = new HashMap();
        this.worker.buildSinkWorkerTask(clusterConfigState, connectorConfig, TASK_ID, testSinkTask, taskStatusListener, TargetState.STARTED, this.taskKeyConverter, this.taskValueConverter, this.taskHeaderConverter, getClass().getClassLoader(), (autoCloseable, str) -> {
        });
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TransformationChain.class, 1);
        hashMap2.put(RetryWithToleranceOperator.class, 1);
        hashMap2.put(KafkaConsumer.class, 1);
        hashMap2.put(ErrorHandlingMetrics.class, 1);
        hashMap2.put(LogReporter.class, 1);
        Assert.assertEquals(hashMap2, hashMap);
        verifyKafkaClusterId();
    }

    @Test
    @Confluent
    public void testWorkerTracingEnabled() throws Exception {
        mockInternalConverters();
        mockStorage();
        mockFileConfigProvider();
        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, this.task);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "key.converter", this.taskKeyConverter);
        mockTaskConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, "value.converter", this.taskValueConverter);
        mockTaskHeaderConverter(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER, this.taskHeaderConverter);
        mockExecutorFakeSubmit(WorkerTask.class);
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.allConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        assertStatistics(this.worker, 0, 0);
        assertStartupStatistics(this.worker, 0, 0, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        Map<String, String> anyConnectorConfigMap = anyConnectorConfigMap();
        anyConnectorConfigMap.put("trace.records.enable", Boolean.TRUE.toString());
        anyConnectorConfigMap.put("producer.override.linger.ms", "100");
        anyConnectorConfigMap.put("producer.override.request.timeout.ms", "30000");
        this.worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap, hashMap, this.taskStatusListener, TargetState.STARTED);
        assertStatistics(this.worker, 0, 1);
        Assert.assertEquals(new HashSet(Arrays.asList(TASK_ID)), this.worker.taskIds());
        this.worker.stopAndAwaitTask(TASK_ID);
        assertStatistics(this.worker, 0, 0);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        assertStatistics(this.worker, 0, 0);
        verifyGenericIsolation();
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(3))).connectorClass(ArgumentMatchers.anyString());
        ((Plugins) Mockito.verify(this.plugins)).newTask((Class) ArgumentMatchers.any());
        ((TestSourceTask) Mockito.verify(this.task)).version();
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(2))).newConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq("key.converter"), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER));
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(2))).newConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq("value.converter"), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER));
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(2))).newHeaderConverter((AbstractConfig) ArgumentMatchers.any(AbstractConfig.class), (String) ArgumentMatchers.eq("header.converter"), (Plugins.ClassLoaderUsage) ArgumentMatchers.eq(Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER));
        verifyExecutorSubmit();
        verifyKafkaClusterId();
    }

    @Test
    public void testCreateConnectorLogEventState() {
        mockKafkaClusterId();
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.executorService, this.noneConnectorClientConfigOverridePolicy);
        this.worker.start();
        MockTime mockTime = new MockTime();
        Long l = 10000L;
        Map.Entry mkEntry = Utils.mkEntry(CONNECTOR_ID, new LogEventState(CONNECTOR_ID, l.longValue(), mockTime));
        Map.Entry mkEntry2 = Utils.mkEntry(TASK_ID.toString(), new LogEventState(TASK_ID.toString(), l.longValue(), mockTime));
        this.worker.createIfAbsentConnectorLogEventState(mkEntry);
        this.worker.createIfAbsentTaskLogEventState(mkEntry2);
        Assert.assertEquals(1L, this.worker.connectorLogEventStateMap().size());
        Assert.assertEquals(mkEntry.getValue(), this.worker.connectorLogEventStateMap().get(CONNECTOR_ID));
        Assert.assertEquals(1L, this.worker.taskLogEventStateMap().size());
        Assert.assertEquals(mkEntry2.getValue(), this.worker.taskLogEventStateMap().get(TASK_ID.toString()));
        this.worker.createIfAbsentConnectorLogEventState(Utils.mkEntry(CONNECTOR_ID, new LogEventState(CONNECTOR_ID, l.longValue() * 2, mockTime)));
        Assert.assertEquals(mkEntry.getValue(), this.worker.connectorLogEventStateMap().get(CONNECTOR_ID));
        this.worker.createIfAbsentTaskLogEventState(Utils.mkEntry(TASK_ID.toString(), new LogEventState(TASK_ID.toString(), l.longValue() * 2, mockTime)));
        Assert.assertEquals(mkEntry2.getValue(), this.worker.taskLogEventStateMap().get(TASK_ID.toString()));
        this.worker.stop();
        verifyKafkaClusterId();
    }
}
