package org.apache.kafka.connect.runtime;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({WorkerTask.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTaskTest.class */
public class WorkerTaskTest {
    private static final Map<String, String> TASK_PROPS = new HashMap();
    private static final TaskConfig TASK_CONFIG;
    private ConnectMetrics metrics;

    @Mock
    private TaskStatus.Listener statusListener;

    @Mock
    private ClassLoader loader;
    RetryWithToleranceOperator retryWithToleranceOperator;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTaskTest$TestSinkTask.class */
    private static abstract class TestSinkTask extends SinkTask {
        private TestSinkTask() {
        }
    }

    @Before
    public void setup() {
        this.metrics = new MockConnectMetrics();
        this.retryWithToleranceOperator = RetryWithToleranceOperatorTest.NOOP_OPERATOR;
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @Test
    public void standardStartup() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId("foo", 0);
        WorkerTask workerTask = (WorkerTask) EasyMock.partialMockBuilder(WorkerTask.class).withConstructor(new Class[]{ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class, ClassLoader.class, ConnectMetrics.class, RetryWithToleranceOperator.class}).withArgs(new Object[]{connectorTaskId, this.statusListener, TargetState.STARTED, this.loader, this.metrics, this.retryWithToleranceOperator}).addMockedMethod("initialize").addMockedMethod("execute").addMockedMethod("close").createStrictMock();
        workerTask.initialize(TASK_CONFIG);
        EasyMock.expectLastCall();
        workerTask.execute();
        EasyMock.expectLastCall();
        this.statusListener.onStartup(connectorTaskId);
        EasyMock.expectLastCall();
        workerTask.close();
        EasyMock.expectLastCall();
        workerTask.releaseResources();
        EasyMock.expectLastCall();
        this.statusListener.onShutdown(connectorTaskId);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{workerTask});
        workerTask.initialize(TASK_CONFIG);
        workerTask.run();
        workerTask.stop();
        workerTask.awaitStop(1000L);
        EasyMock.verify(new Object[]{workerTask});
    }

    @Test
    public void stopBeforeStarting() {
        WorkerTask workerTask = (WorkerTask) EasyMock.partialMockBuilder(WorkerTask.class).withConstructor(new Class[]{ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class, ClassLoader.class, ConnectMetrics.class, RetryWithToleranceOperator.class}).withArgs(new Object[]{new ConnectorTaskId("foo", 0), this.statusListener, TargetState.STARTED, this.loader, this.metrics, this.retryWithToleranceOperator}).addMockedMethod("initialize").addMockedMethod("execute").addMockedMethod("close").createStrictMock();
        workerTask.initialize(TASK_CONFIG);
        EasyMock.expectLastCall();
        workerTask.close();
        EasyMock.expectLastCall();
        workerTask.releaseResources();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{workerTask});
        workerTask.initialize(TASK_CONFIG);
        workerTask.stop();
        workerTask.awaitStop(1000L);
        workerTask.run();
        EasyMock.verify(new Object[]{workerTask});
    }

    @Test
    public void cancelBeforeStopping() throws Exception {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId("foo", 0);
        WorkerTask workerTask = (WorkerTask) EasyMock.partialMockBuilder(WorkerTask.class).withConstructor(new Class[]{ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class, ClassLoader.class, ConnectMetrics.class, RetryWithToleranceOperator.class}).withArgs(new Object[]{connectorTaskId, this.statusListener, TargetState.STARTED, this.loader, this.metrics, this.retryWithToleranceOperator}).addMockedMethod("initialize").addMockedMethod("execute").addMockedMethod("close").createStrictMock();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final Thread thread = new Thread() { // from class: org.apache.kafka.connect.runtime.WorkerTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                } catch (Exception e) {
                }
            }
        };
        workerTask.initialize(TASK_CONFIG);
        EasyMock.expectLastCall();
        workerTask.execute();
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.runtime.WorkerTaskTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m49answer() throws Throwable {
                thread.start();
                return null;
            }
        });
        this.statusListener.onStartup(connectorTaskId);
        EasyMock.expectLastCall();
        workerTask.close();
        EasyMock.expectLastCall();
        workerTask.releaseResources();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{workerTask});
        workerTask.initialize(TASK_CONFIG);
        workerTask.run();
        workerTask.stop();
        workerTask.cancel();
        countDownLatch.countDown();
        thread.join();
        EasyMock.verify(new Object[]{workerTask});
    }

    @Test
    public void updateMetricsOnListenerEventsForStartupPauseResumeAndShutdown() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId("foo", 0);
        WorkerTask.TaskMetricsGroup taskMetricsGroup = new WorkerTask.TaskMetricsGroup(connectorTaskId, new MockConnectMetrics(), this.statusListener);
        this.statusListener.onStartup(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onPause(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onResume(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onShutdown(connectorTaskId);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.statusListener});
        taskMetricsGroup.onStartup(connectorTaskId);
        assertRunningMetric(taskMetricsGroup);
        taskMetricsGroup.onPause(connectorTaskId);
        assertPausedMetric(taskMetricsGroup);
        taskMetricsGroup.onResume(connectorTaskId);
        assertRunningMetric(taskMetricsGroup);
        taskMetricsGroup.onShutdown(connectorTaskId);
        assertStoppedMetric(taskMetricsGroup);
        EasyMock.verify(new Object[]{this.statusListener});
    }

    @Test
    public void updateMetricsOnListenerEventsForStartupPauseResumeAndFailure() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId("foo", 0);
        MockConnectMetrics mockConnectMetrics = new MockConnectMetrics();
        MockTime m20time = mockConnectMetrics.m20time();
        ConnectException connectException = new ConnectException("error");
        WorkerTask.TaskMetricsGroup taskMetricsGroup = new WorkerTask.TaskMetricsGroup(connectorTaskId, mockConnectMetrics, this.statusListener);
        this.statusListener.onStartup(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onPause(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onResume(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onPause(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onResume(connectorTaskId);
        EasyMock.expectLastCall();
        this.statusListener.onFailure(connectorTaskId, connectException);
        EasyMock.expectLastCall();
        this.statusListener.onShutdown(connectorTaskId);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.statusListener});
        m20time.sleep(1000L);
        taskMetricsGroup.onStartup(connectorTaskId);
        assertRunningMetric(taskMetricsGroup);
        m20time.sleep(2000L);
        taskMetricsGroup.onPause(connectorTaskId);
        assertPausedMetric(taskMetricsGroup);
        m20time.sleep(3000L);
        taskMetricsGroup.onResume(connectorTaskId);
        assertRunningMetric(taskMetricsGroup);
        m20time.sleep(4000L);
        taskMetricsGroup.onPause(connectorTaskId);
        assertPausedMetric(taskMetricsGroup);
        m20time.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        taskMetricsGroup.onResume(connectorTaskId);
        assertRunningMetric(taskMetricsGroup);
        m20time.sleep(6000L);
        taskMetricsGroup.onFailure(connectorTaskId, connectException);
        assertFailedMetric(taskMetricsGroup);
        m20time.sleep(7000L);
        taskMetricsGroup.onShutdown(connectorTaskId);
        assertStoppedMetric(taskMetricsGroup);
        EasyMock.verify(new Object[]{this.statusListener});
        Assert.assertEquals(8000.0d / 27000, mockConnectMetrics.currentMetricValueAsDouble(taskMetricsGroup.metricGroup(), "pause-ratio"), 1.0E-6d);
        Assert.assertEquals(12000.0d / 27000, mockConnectMetrics.currentMetricValueAsDouble(taskMetricsGroup.metricGroup(), "running-ratio"), 1.0E-6d);
    }

    protected void assertFailedMetric(WorkerTask.TaskMetricsGroup taskMetricsGroup) {
        Assert.assertEquals(AbstractStatus.State.FAILED, taskMetricsGroup.state());
    }

    protected void assertPausedMetric(WorkerTask.TaskMetricsGroup taskMetricsGroup) {
        Assert.assertEquals(AbstractStatus.State.PAUSED, taskMetricsGroup.state());
    }

    protected void assertRunningMetric(WorkerTask.TaskMetricsGroup taskMetricsGroup) {
        Assert.assertEquals(AbstractStatus.State.RUNNING, taskMetricsGroup.state());
    }

    protected void assertStoppedMetric(WorkerTask.TaskMetricsGroup taskMetricsGroup) {
        Assert.assertEquals(AbstractStatus.State.UNASSIGNED, taskMetricsGroup.state());
    }

    static {
        TASK_PROPS.put("task.class", TestSinkTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
    }
}
