package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnectorTest.class */
public class WorkerConnectorTest {
    private static final String VERSION = "1.1";
    public static final String CONNECTOR = "connector";
    public static final Map<String, String> CONFIG = new HashMap();
    public ConnectorConfig connectorConfig;
    public MockConnectMetrics metrics;

    @Mock
    private Plugins plugins;

    @Mock
    private SourceConnector sourceConnector;

    @Mock
    private SinkConnector sinkConnector;

    @Mock
    private CloseableConnectorContext ctx;

    @Mock
    private ConnectorStatus.Listener listener;

    @Mock
    private Herder herder;

    @Mock
    private CloseableOffsetStorageReader offsetStorageReader;

    @Mock
    private ConnectorOffsetBackingStore offsetStore;

    @Mock
    private ClassLoader classLoader;
    private Connector connector;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnectorTest$TestConnector.class */
    private static abstract class TestConnector extends Connector {
        private TestConnector() {
        }
    }

    @Before
    public void setup() {
        this.connectorConfig = new ConnectorConfig(this.plugins, CONFIG);
        this.metrics = new MockConnectMetrics();
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @Test
    public void testInitializeFailure() {
        RuntimeException runtimeException = new RuntimeException();
        this.connector = this.sourceConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any());
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertFailedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyCleanShutdown(false);
    }

    @Test
    public void testFailureIsFinalState() {
        RuntimeException runtimeException = new RuntimeException();
        this.connector = this.sinkConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any());
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, (CloseableOffsetStorageReader) null, (ConnectorOffsetBackingStore) null, this.classLoader);
        workerConnector.initialize();
        assertFailedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertFailedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.any(Exception.class), ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testStartupAndShutdown() {
        this.connector = this.sourceConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedSourceMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        verifyCleanShutdown(true);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testStartupAndPause() {
        this.connector = this.sinkConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, (CloseableOffsetStorageReader) null, (ConnectorOffsetBackingStore) null, this.classLoader);
        workerConnector.initialize();
        assertInitializedSinkMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.STARTED));
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.PAUSED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testOnResume() {
        this.connector = this.sourceConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedSourceMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onResume(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.PAUSED));
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testStartupPaused() {
        this.connector = this.sinkConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, (CloseableOffsetStorageReader) null, (ConnectorOffsetBackingStore) null, this.classLoader);
        workerConnector.initialize();
        assertInitializedSinkMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.PAUSED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testStartupFailure() {
        RuntimeException runtimeException = new RuntimeException();
        this.connector = this.sinkConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).start(CONFIG);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, (CloseableOffsetStorageReader) null, (ConnectorOffsetBackingStore) null, this.classLoader);
        workerConnector.initialize();
        assertInitializedSinkMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertFailedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.any(Exception.class), ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testShutdownFailure() {
        RuntimeException runtimeException = new RuntimeException();
        this.connector = this.sourceConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).stop();
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedSourceMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertFailedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyShutdown(false, true);
    }

    @Test
    public void testConnectorTaskMetrics() {
        this.connector = this.sinkConnector;
        Long l = 1L;
        Long l2 = 2L;
        Long l3 = 3L;
        Long l4 = 4L;
        Long l5 = 5L;
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < l.longValue(); i++) {
            arrayList.add(new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg"));
        }
        for (int i2 = 0; i2 < l2.longValue(); i2++) {
            arrayList.add(new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg"));
        }
        for (int i3 = 0; i3 < l3.longValue(); i3++) {
            arrayList.add(new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg"));
        }
        for (int i4 = 0; i4 < l4.longValue(); i4++) {
            arrayList.add(new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", "msg"));
        }
        for (int i5 = 0; i5 < l5.longValue(); i5++) {
            arrayList.add(new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", "msg"));
        }
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Mockito.when(this.herder.connectorStatus(CONNECTOR)).thenReturn(new ConnectorStateInfo(CONNECTOR, new ConnectorStateInfo.ConnectorState("RUNNING", "worker", "msg"), arrayList, ConnectorType.SINK));
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.metrics().addHerderMetrics(this.herder);
        workerConnector.initialize();
        Assert.assertEquals(15L, (Long) this.metrics.currentMetricValue(workerConnector.metrics().metricGroup(), "connector-total-task-count"));
        verifyInitialize();
    }

    @Test
    public void testTransitionStartedToStarted() {
        this.connector = this.sourceConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedSourceMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        verifyCleanShutdown(true);
        ((Callback) Mockito.verify(mockCallback, Mockito.times(2))).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testTransitionPausedToPaused() {
        this.connector = this.sourceConnector;
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedSourceMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertStoppedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.STARTED));
        ((Callback) inOrder.verify(mockCallback, Mockito.times(2))).onCompletion((Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.eq(TargetState.PAUSED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @Test
    public void testFailConnectorThatIsNeitherSourceNorSink() {
        this.connector = (Connector) Mockito.mock(Connector.class);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader).initialize();
        ((Connector) Mockito.verify(this.connector)).version();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure((String) ArgumentMatchers.eq(CONNECTOR), (Throwable) forClass.capture());
        Throwable th = (Throwable) forClass.getValue();
        Assert.assertTrue(th instanceof ConnectException);
        Assert.assertTrue(th.getMessage().contains("must be a subclass of"));
    }

    protected void assertFailedMetric(WorkerConnector workerConnector) {
        Assert.assertFalse(workerConnector.metrics().isUnassigned());
        Assert.assertTrue(workerConnector.metrics().isFailed());
        Assert.assertFalse(workerConnector.metrics().isPaused());
        Assert.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertPausedMetric(WorkerConnector workerConnector) {
        Assert.assertFalse(workerConnector.metrics().isUnassigned());
        Assert.assertFalse(workerConnector.metrics().isFailed());
        Assert.assertTrue(workerConnector.metrics().isPaused());
        Assert.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertRunningMetric(WorkerConnector workerConnector) {
        Assert.assertFalse(workerConnector.metrics().isUnassigned());
        Assert.assertFalse(workerConnector.metrics().isFailed());
        Assert.assertFalse(workerConnector.metrics().isPaused());
        Assert.assertTrue(workerConnector.metrics().isRunning());
    }

    protected void assertStoppedMetric(WorkerConnector workerConnector) {
        Assert.assertTrue(workerConnector.metrics().isUnassigned());
        Assert.assertFalse(workerConnector.metrics().isFailed());
        Assert.assertFalse(workerConnector.metrics().isPaused());
        Assert.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertInitializedSinkMetric(WorkerConnector workerConnector) {
        assertInitializedMetric(workerConnector, "sink");
    }

    protected void assertInitializedSourceMetric(WorkerConnector workerConnector) {
        assertInitializedMetric(workerConnector, "source");
    }

    protected void assertInitializedMetric(WorkerConnector workerConnector, String str) {
        Assert.assertTrue(workerConnector.metrics().isUnassigned());
        Assert.assertFalse(workerConnector.metrics().isFailed());
        Assert.assertFalse(workerConnector.metrics().isPaused());
        Assert.assertFalse(workerConnector.metrics().isRunning());
        ConnectMetrics.MetricGroup metricGroup = workerConnector.metrics().metricGroup();
        this.metrics.currentMetricValueAsString(metricGroup, "status");
        String currentMetricValueAsString = this.metrics.currentMetricValueAsString(metricGroup, "connector-type");
        String currentMetricValueAsString2 = this.metrics.currentMetricValueAsString(metricGroup, "connector-class");
        String currentMetricValueAsString3 = this.metrics.currentMetricValueAsString(metricGroup, "connector-version");
        Assert.assertEquals(str, currentMetricValueAsString);
        Assert.assertNotNull(currentMetricValueAsString2);
        Assert.assertEquals(VERSION, currentMetricValueAsString3);
    }

    private Callback<TargetState> mockCallback() {
        return (Callback) Mockito.mock(Callback.class);
    }

    private void verifyInitialize() {
        ((Connector) Mockito.verify(this.connector)).version();
        if (!(this.connector instanceof SourceConnector)) {
            ((Connector) Mockito.verify(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any(SinkConnectorContext.class));
        } else {
            ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).start();
            ((Connector) Mockito.verify(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any(SourceConnectorContext.class));
        }
    }

    private void verifyCleanShutdown(boolean z) {
        verifyShutdown(true, z);
    }

    private void verifyShutdown(boolean z, boolean z2) {
        ((CloseableConnectorContext) Mockito.verify(this.ctx)).close();
        if (this.connector instanceof SourceConnector) {
            ((CloseableOffsetStorageReader) Mockito.verify(this.offsetStorageReader)).close();
            ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).stop();
        }
        if (z) {
            ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onShutdown(CONNECTOR);
        }
        if (z2) {
            ((Connector) Mockito.verify(this.connector)).stop();
        }
    }

    static {
        CONFIG.put("connector.class", TestConnector.class.getName());
        CONFIG.put("name", CONNECTOR);
        CONFIG.put("topics", "my-topic");
    }
}
