package org.apache.kafka.connect.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.MockTime;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({Worker.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest.class */
public class WorkerTest extends ThreadedTest {
    private static final String CONNECTOR_ID = "test-connector";
    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
    private WorkerConfig config;
    private Worker worker;
    private OffsetBackingStore offsetBackingStore = (OffsetBackingStore) PowerMock.createMock(OffsetBackingStore.class);

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestConnector.class */
    private static class TestConnector extends Connector {
        private TestConnector() {
        }

        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
        }

        public Class<? extends Task> taskClass() {
            return null;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return null;
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTest$TestSourceTask.class */
    private static class TestSourceTask extends SourceTask {
        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
        }

        public List<SourceRecord> poll() throws InterruptedException {
            return null;
        }

        public void stop() {
        }
    }

    @Override // org.apache.kafka.connect.util.ThreadedTest
    @Before
    public void setup() {
        super.setup();
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter.schemas.enable", "false");
        hashMap.put("internal.value.converter.schemas.enable", "false");
        this.config = new StandaloneConfig(hashMap);
    }

    @Test
    public void testAddRemoveConnector() throws Exception {
        this.offsetBackingStore.configure((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        Connector connector = (Connector) PowerMock.createMock(Connector.class);
        ConnectorContext connectorContext = (ConnectorContext) PowerMock.createMock(ConnectorContext.class);
        PowerMock.mockStatic(Worker.class);
        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
        EasyMock.expect(connector.version()).andReturn("1.0");
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", TestConnector.class.getName());
        connector.initialize(connectorContext);
        EasyMock.expectLastCall();
        connector.start(hashMap);
        EasyMock.expectLastCall();
        connector.stop();
        EasyMock.expectLastCall();
        this.offsetBackingStore.stop();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(new MockTime(), this.config, this.offsetBackingStore);
        this.worker.start();
        ConnectorConfig connectorConfig = new ConnectorConfig(hashMap);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.addConnector(connectorConfig, connectorContext);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        try {
            this.worker.addConnector(connectorConfig, connectorContext);
            Assert.fail("Should have thrown exception when trying to add connector with same name.");
        } catch (ConnectException e) {
        }
        this.worker.stopConnector(CONNECTOR_ID);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        PowerMock.verifyAll();
    }

    @Test(expected = ConnectException.class)
    public void testStopInvalidConnector() {
        this.offsetBackingStore.configure((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(new MockTime(), this.config, this.offsetBackingStore);
        this.worker.start();
        this.worker.stopConnector(CONNECTOR_ID);
    }

    @Test
    public void testReconfigureConnectorTasks() throws Exception {
        this.offsetBackingStore.configure((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        Connector connector = (Connector) PowerMock.createMock(Connector.class);
        ConnectorContext connectorContext = (ConnectorContext) PowerMock.createMock(ConnectorContext.class);
        PowerMock.mockStatic(Worker.class);
        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
        EasyMock.expect(connector.version()).andReturn("1.0");
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", TestConnector.class.getName());
        connector.initialize(connectorContext);
        EasyMock.expectLastCall();
        connector.start(hashMap);
        EasyMock.expectLastCall();
        EasyMock.expect(connector.taskClass()).andReturn(TestSourceTask.class);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("foo", "bar");
        EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(hashMap2, hashMap2));
        connector.stop();
        EasyMock.expectLastCall();
        this.offsetBackingStore.stop();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(new MockTime(), this.config, this.offsetBackingStore);
        this.worker.start();
        ConnectorConfig connectorConfig = new ConnectorConfig(hashMap);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.addConnector(connectorConfig, connectorContext);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        try {
            this.worker.addConnector(connectorConfig, connectorContext);
            Assert.fail("Should have thrown exception when trying to add connector with same name.");
        } catch (ConnectException e) {
        }
        List connectorTaskConfigs = this.worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
        HashMap hashMap3 = new HashMap();
        hashMap3.put("foo", "bar");
        hashMap3.put("task.class", TestSourceTask.class.getName());
        hashMap3.put("topics", "foo,bar");
        Assert.assertEquals(2L, connectorTaskConfigs.size());
        Assert.assertEquals(hashMap3, connectorTaskConfigs.get(0));
        Assert.assertEquals(hashMap3, connectorTaskConfigs.get(1));
        this.worker.stopConnector(CONNECTOR_ID);
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        this.worker.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testAddRemoveTask() throws Exception {
        this.offsetBackingStore.configure((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        ConnectorTaskId connectorTaskId = new ConnectorTaskId("job", 0);
        TestSourceTask testSourceTask = (TestSourceTask) PowerMock.createMock(TestSourceTask.class);
        WorkerSourceTask workerSourceTask = (WorkerSourceTask) PowerMock.createMock(WorkerSourceTask.class);
        PowerMock.mockStatic(Worker.class);
        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(testSourceTask);
        EasyMock.expect(testSourceTask.version()).andReturn("1.0");
        PowerMock.expectNew(WorkerSourceTask.class, new Object[]{EasyMock.eq(connectorTaskId), EasyMock.eq(testSourceTask), EasyMock.anyObject(Converter.class), EasyMock.anyObject(Converter.class), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class)}).andReturn(workerSourceTask);
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        workerSourceTask.start(hashMap);
        EasyMock.expectLastCall();
        workerSourceTask.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(workerSourceTask.awaitStop(EasyMock.anyLong()))).andStubReturn(true);
        workerSourceTask.close();
        EasyMock.expectLastCall();
        this.offsetBackingStore.stop();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(new MockTime(), this.config, this.offsetBackingStore);
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.addTask(connectorTaskId, new TaskConfig(hashMap));
        Assert.assertEquals(new HashSet(Arrays.asList(connectorTaskId)), this.worker.taskIds());
        this.worker.stopTask(connectorTaskId);
        Assert.assertEquals(Collections.emptySet(), this.worker.taskIds());
        this.worker.stop();
        PowerMock.verifyAll();
    }

    @Test(expected = ConnectException.class)
    public void testStopInvalidTask() {
        this.offsetBackingStore.configure((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(new MockTime(), this.config, this.offsetBackingStore);
        this.worker.start();
        this.worker.stopTask(TASK_ID);
    }

    @Test
    public void testCleanupTasksOnStop() throws Exception {
        this.offsetBackingStore.configure((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        this.offsetBackingStore.start();
        EasyMock.expectLastCall();
        TestSourceTask testSourceTask = (TestSourceTask) PowerMock.createMock(TestSourceTask.class);
        WorkerSourceTask workerSourceTask = (WorkerSourceTask) PowerMock.createMock(WorkerSourceTask.class);
        PowerMock.mockStatic(Worker.class);
        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(testSourceTask);
        EasyMock.expect(testSourceTask.version()).andReturn("1.0");
        PowerMock.expectNew(WorkerSourceTask.class, new Object[]{EasyMock.eq(TASK_ID), EasyMock.eq(testSourceTask), EasyMock.anyObject(Converter.class), EasyMock.anyObject(Converter.class), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class)}).andReturn(workerSourceTask);
        HashMap hashMap = new HashMap();
        hashMap.put("task.class", TestSourceTask.class.getName());
        workerSourceTask.start(hashMap);
        EasyMock.expectLastCall();
        workerSourceTask.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(workerSourceTask.awaitStop(EasyMock.anyLong()))).andReturn(true);
        workerSourceTask.close();
        EasyMock.expectLastCall();
        this.offsetBackingStore.stop();
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.worker = new Worker(new MockTime(), this.config, this.offsetBackingStore);
        this.worker.start();
        this.worker.addTask(TASK_ID, new TaskConfig(hashMap));
        this.worker.stop();
        PowerMock.verifyAll();
    }
}
