package org.apache.kafka.connect.runtime;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerSourceTask;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.runtime.tracing.TracingContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.powermock.reflect.Whitebox;

@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
@PowerMockIgnore({"javax.management.*", "org.apache.log4j.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSourceTaskTest.class */
public class WorkerSourceTaskTest extends ThreadedTest {
    private static final String TOPIC = "topic";
    private static final String OTHER_TOPIC = "other-topic";
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
    private WorkerConfig config;
    private SourceConnectorConfig sourceConfig;
    private Plugins plugins;
    private MockConnectMetrics metrics;

    @Mock
    private SourceTask sourceTask;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<SourceRecord> transformationChain;

    @Mock
    private KafkaProducer<byte[], byte[]> producer;

    @Mock
    private TopicAdmin admin;

    @Mock
    private CloseableOffsetStorageReader offsetReader;

    @Mock
    private OffsetStorageWriter offsetWriter;

    @Mock
    private ClusterConfigState clusterConfigState;
    private WorkerSourceTask workerTask;

    @Mock
    private Future<RecordMetadata> sendFuture;

    @MockStrict
    private TaskStatus.Listener statusListener;

    @Mock
    private StatusBackingStore statusBackingStore;
    private Capture<Callback> producerCallbacks;
    private static final TaskConfig TASK_CONFIG;
    private static final List<SourceRecord> RECORDS;
    private boolean enableTopicCreation;
    private static final Map<String, Object> PARTITION = Collections.singletonMap("key", "partition".getBytes());
    private static final Map<String, Object> OFFSET = Collections.singletonMap("key", 12);
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Integer KEY = -1;
    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
    private static final Long RECORD = 12L;
    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
    private static final Map<String, String> TASK_PROPS = new HashMap();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSourceTaskTest$TestSourceTask.class */
    private static abstract class TestSourceTask extends SourceTask {
        private TestSourceTask() {
        }
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    public WorkerSourceTaskTest(boolean z) {
        this.enableTopicCreation = z;
    }

    @Override // org.apache.kafka.connect.util.ThreadedTest
    public void setup() {
        super.setup();
        Map<String, String> workerProps = workerProps();
        this.plugins = new Plugins(workerProps);
        this.config = new StandaloneConfig(workerProps);
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorPropsWithGroups("topic"), true);
        this.producerCallbacks = EasyMock.newCapture();
        this.metrics = new MockConnectMetrics();
    }

    private Map<String, String> workerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("topic.creation.enable", String.valueOf(this.enableTopicCreation));
        return hashMap;
    }

    private Map<String, String> sourceConnectorPropsWithGroups(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "foo-connector");
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topic", str);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.groups", String.join(",", "foo", "bar"));
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.put("topic.creation.foo.include", str);
        hashMap.put("topic.creation.bar.include", ".*");
        hashMap.put("topic.creation.bar.exclude", str);
        return hashMap;
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    private void createWorkerTask() {
        createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
    }

    private void createWorkerTaskWithErrorToleration() {
        createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.ALL_OPERATOR);
    }

    private void createWorkerTask(TargetState targetState) {
        createWorkerTask(targetState, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
    }

    private void createWorkerTask(TargetState targetState, RetryWithToleranceOperator retryWithToleranceOperator) {
        createWorkerTask(targetState, this.keyConverter, this.valueConverter, this.headerConverter, retryWithToleranceOperator);
    }

    private void createWorkerTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter) {
        createWorkerTask(targetState, converter, converter2, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
    }

    private void createWorkerTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, RetryWithToleranceOperator retryWithToleranceOperator) {
        this.workerTask = new WorkerSourceTask(this.taskId, this.sourceTask, this.statusListener, targetState, converter, converter2, headerConverter, this.transformationChain, this.producer, this.admin, TopicCreationGroup.configuredGroups(this.sourceConfig), this.offsetReader, this.offsetWriter, this.config, this.clusterConfigState, this.metrics, this.plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator, this.statusBackingStore, (v0) -> {
            v0.run();
        });
    }

    @Test
    public void testStartPaused() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createWorkerTask(TargetState.PAUSED);
        this.statusListener.onPause(this.taskId);
        EasyMock.expectLastCall().andAnswer(() -> {
            countDownLatch.countDown();
            return null;
        });
        expectClose();
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        PowerMock.verifyAll();
    }

    @Test
    public void testPause() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch expectPolls = expectPolls(10, atomicInteger);
        expectTopicCreation("topic");
        this.statusListener.onPause(this.taskId);
        EasyMock.expectLastCall();
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        this.offsetWriter.offset(PARTITION, OFFSET);
        PowerMock.expectLastCall();
        expectOffsetFlush(true);
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(expectPolls));
        this.workerTask.transitionTo(TargetState.PAUSED);
        int i = atomicInteger.get();
        Thread.sleep(100L);
        Assert.assertTrue(atomicInteger.get() - i <= 1);
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        PowerMock.verifyAll();
    }

    @Test
    public void testPollsInBackground() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch expectPolls = expectPolls(10);
        expectTopicCreation("topic");
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        this.offsetWriter.offset(PARTITION, OFFSET);
        PowerMock.expectLastCall();
        expectOffsetFlush(true);
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(expectPolls));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(10);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPoll() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            throw runtimeException;
        });
        this.statusListener.onFailure(this.taskId, runtimeException);
        EasyMock.expectLastCall();
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(true);
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPollAfterCancel() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            Assert.assertTrue(awaitLatch(countDownLatch2));
            throw runtimeException;
        });
        this.offsetReader.close();
        PowerMock.expectLastCall();
        this.producer.close(Duration.ZERO);
        PowerMock.expectLastCall();
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(true);
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.cancel();
        countDownLatch2.countDown();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPollAfterStop() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            Assert.assertTrue(awaitLatch(countDownLatch2));
            throw runtimeException;
        });
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(true);
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.stop();
        countDownLatch2.countDown();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testPollReturnsNoRecords() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch expectEmptyPolls = expectEmptyPolls(1, new AtomicInteger());
        expectOffsetFlush(true);
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(true);
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(expectEmptyPolls));
        Assert.assertTrue(this.workerTask.commitOffsets());
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testCommit() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch expectPolls = expectPolls(1);
        expectOffsetFlush(true);
        this.offsetWriter.offset(PARTITION, OFFSET);
        PowerMock.expectLastCall().atLeastOnce();
        expectTopicCreation("topic");
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(true);
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(expectPolls));
        Assert.assertTrue(this.workerTask.commitOffsets());
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(1);
        PowerMock.verifyAll();
    }

    @Test
    public void testCommitFailure() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch expectPolls = expectPolls(1);
        expectOffsetFlush(true);
        this.offsetWriter.offset(PARTITION, OFFSET);
        PowerMock.expectLastCall().atLeastOnce();
        expectTopicCreation("topic");
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(false);
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(expectPolls));
        Assert.assertTrue(this.workerTask.commitOffsets());
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(1);
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsConvertsData() throws Exception {
        createWorkerTask();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes = expectSendRecordAnyTimes();
        expectTopicCreation("topic");
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", arrayList);
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertEquals(SERIALIZED_KEY, ((ProducerRecord) expectSendRecordAnyTimes.getValue()).key());
        Assert.assertEquals(SERIALIZED_RECORD, ((ProducerRecord) expectSendRecordAnyTimes.getValue()).value());
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsPropagatesTimestamp() throws Exception {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        createWorkerTask();
        List singletonList = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, valueOf));
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes = expectSendRecordAnyTimes();
        expectTopicCreation("topic");
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", singletonList);
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertEquals(valueOf, ((ProducerRecord) expectSendRecordAnyTimes.getValue()).timestamp());
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsCorruptTimestamp() throws Exception {
        createWorkerTask();
        List singletonList = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, -3L));
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes = expectSendRecordAnyTimes();
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", singletonList);
        Assert.assertThrows(InvalidRecordException.class, () -> {
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        });
        Assert.assertFalse(expectSendRecordAnyTimes.hasCaptured());
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsNoTimestamp() throws Exception {
        createWorkerTask();
        List singletonList = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, -1L));
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes = expectSendRecordAnyTimes();
        expectTopicCreation("topic");
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", singletonList);
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertNull(((ProducerRecord) expectSendRecordAnyTimes.getValue()).timestamp());
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsRetries() throws Exception {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectSendRecordOnce();
        expectSendRecordSyncFailure(new TimeoutException("retriable sync failure"));
        expectSendRecordOnce();
        expectSendRecordOnce();
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3));
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertEquals(Arrays.asList(sourceRecord2, sourceRecord3), Whitebox.getInternalState(this.workerTask, "toSend"));
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertNull(Whitebox.getInternalState(this.workerTask, "toSend"));
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsProducerCallbackFail() throws Exception {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectSendRecordProducerCallbackFail();
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
        Assert.assertThrows(ConnectException.class, () -> {
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        });
    }

    @Test
    public void testSendRecordsProducerSendFailsImmediately() {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            expectTopicCreation("topic");
            EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException("topic")));
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Assert.assertThrows(ConnectException.class, () -> {
                Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            });
        }
    }

    @Test
    public void testSendRecordsTaskCommitRecordFail() throws Exception {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectSendRecordOnce();
        expectSendRecordTaskCommitRecordFail(false);
        expectSendRecordOnce();
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3));
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertNull(Whitebox.getInternalState(this.workerTask, "toSend"));
        PowerMock.verifyAll();
    }

    @Test
    public void testSourceTaskIgnoresProducerException() throws Exception {
        createWorkerTaskWithErrorToleration();
        expectTopicCreation("topic");
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectSendRecordOnce();
        expectSendRecordProducerCallbackFail();
        this.sourceTask.commitRecord((SourceRecord) EasyMock.anyObject(SourceRecord.class), (RecordMetadata) EasyMock.isNull());
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        PowerMock.verifyAll();
    }

    @Test
    public void testSlowTaskStart() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall().andAnswer(() -> {
            countDownLatch.countDown();
            Assert.assertTrue(awaitLatch(countDownLatch2));
            return null;
        });
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        expectOffsetFlush(true);
        this.statusListener.onShutdown(this.taskId);
        EasyMock.expectLastCall();
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.stop();
        countDownLatch2.countDown();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        PowerMock.verifyAll();
    }

    @Test
    public void testCancel() {
        createWorkerTask();
        this.offsetReader.close();
        PowerMock.expectLastCall();
        this.producer.close(Duration.ZERO);
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.cancel();
        PowerMock.verifyAll();
    }

    @Test
    public void testMetricsGroup() {
        WorkerSourceTask.SourceTaskMetricsGroup sourceTaskMetricsGroup = new WorkerSourceTask.SourceTaskMetricsGroup(this.taskId, this.metrics);
        WorkerSourceTask.SourceTaskMetricsGroup sourceTaskMetricsGroup2 = new WorkerSourceTask.SourceTaskMetricsGroup(this.taskId1, this.metrics);
        for (int i = 0; i != 10; i++) {
            sourceTaskMetricsGroup.recordPoll(100, 1000 + (i * 100));
            sourceTaskMetricsGroup.recordWrite(10);
        }
        for (int i2 = 0; i2 != 20; i2++) {
            sourceTaskMetricsGroup2.recordPoll(100, 1000 + (i2 * 100));
            sourceTaskMetricsGroup2.recordWrite(10);
        }
        Assert.assertEquals(1900.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
        Assert.assertEquals(1450.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
        Assert.assertEquals(33.333d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-poll-rate"), 0.001d);
        Assert.assertEquals(1000.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-poll-total"), 0.001d);
        Assert.assertEquals(3.3333d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-write-rate"), 0.001d);
        Assert.assertEquals(100.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-write-total"), 0.001d);
        Assert.assertEquals(900.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup.metricGroup(), "source-record-active-count"), 0.001d);
        sourceTaskMetricsGroup.close();
        Iterator it = sourceTaskMetricsGroup.metricGroup().metrics().metrics().keySet().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(sourceTaskMetricsGroup.metricGroup().groupId().includes((MetricName) it.next()));
        }
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("sink-record-read"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("sink-record-send"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("sink-record-active-count"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("partition-count"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("offset-seq-number"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("offset-commit-completion"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
        Assert.assertNull(sourceTaskMetricsGroup.metricGroup().metrics().getSensor("put-batch-time"));
        Assert.assertEquals(2900.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
        Assert.assertEquals(1950.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
        Assert.assertEquals(66.667d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-poll-rate"), 0.001d);
        Assert.assertEquals(2000.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-poll-total"), 0.001d);
        Assert.assertEquals(6.667d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-write-rate"), 0.001d);
        Assert.assertEquals(200.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-write-total"), 0.001d);
        Assert.assertEquals(1800.0d, this.metrics.currentMetricValueAsDouble(sourceTaskMetricsGroup2.metricGroup(), "source-record-active-count"), 0.001d);
    }

    @Test
    public void testHeaders() throws Exception {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("header_key", "header_value".getBytes());
        ConnectHeaders connectHeaders = new ConnectHeaders();
        connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
        createWorkerTask();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, (Long) null, connectHeaders));
        expectTopicCreation("topic");
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecord = expectSendRecord("topic", true, true, true, true, recordHeaders);
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", arrayList);
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertEquals(SERIALIZED_KEY, ((ProducerRecord) expectSendRecord.getValue()).key());
        Assert.assertEquals(SERIALIZED_RECORD, ((ProducerRecord) expectSendRecord.getValue()).value());
        Assert.assertEquals(recordHeaders, ((ProducerRecord) expectSendRecord.getValue()).headers());
        PowerMock.verifyAll();
    }

    @Test
    public void testHeadersWithCustomConverter() throws Exception {
        StringConverter stringConverter = new StringConverter();
        createWorkerTask(TargetState.STARTED, stringConverter, new SampleConverterWithHeaders(), stringConverter);
        ArrayList arrayList = new ArrayList();
        ConnectHeaders connectHeaders = new ConnectHeaders();
        connectHeaders.addString("encoding", "latin2");
        arrayList.add(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, "Árvíztűrő tükörfúrógép", (Long) null, connectHeaders));
        ConnectHeaders connectHeaders2 = new ConnectHeaders();
        connectHeaders2.addString("encoding", "koi8_r");
        arrayList.add(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, "Тестовое сообщение", (Long) null, connectHeaders2));
        expectTopicCreation("topic");
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecord = expectSendRecord("topic", false, true, true, false, null);
        Capture<ProducerRecord<byte[], byte[]>> expectSendRecord2 = expectSendRecord("topic", false, true, true, false, null);
        PowerMock.replayAll(new Object[0]);
        Whitebox.setInternalState(this.workerTask, "toSend", arrayList);
        Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        Assert.assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap((byte[]) ((ProducerRecord) expectSendRecord.getValue()).key()));
        Assert.assertEquals(ByteBuffer.wrap("Árvíztűrő tükörfúrógép".getBytes("latin2")), ByteBuffer.wrap((byte[]) ((ProducerRecord) expectSendRecord.getValue()).value()));
        Assert.assertEquals("latin2", new String(((ProducerRecord) expectSendRecord.getValue()).headers().lastHeader("encoding").value()));
        Assert.assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap((byte[]) ((ProducerRecord) expectSendRecord2.getValue()).key()));
        Assert.assertEquals(ByteBuffer.wrap("Тестовое сообщение".getBytes("koi8_r")), ByteBuffer.wrap((byte[]) ((ProducerRecord) expectSendRecord2.getValue()).value()));
        Assert.assertEquals("koi8_r", new String(((ProducerRecord) expectSendRecord2.getValue()).headers().lastHeader("encoding").value()));
        PowerMock.verifyAll();
    }

    @Test
    public void testSuppressCloseErrors() throws Exception {
        createWorkerTask();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.statusListener.onStartup(this.taskId);
        EasyMock.expectLastCall();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            throw runtimeException;
        });
        this.statusListener.onFailure(this.taskId, runtimeException);
        EasyMock.expectLastCall();
        this.sourceTask.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.beginFlush())).andThrow(new ConnectException("no soup for u"));
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
    }

    public void testTopicCreateWhenTopicExists() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andReturn(Collections.singletonMap("topic", new TopicDescription("topic", false, Collections.singletonList(new TopicPartitionInfo(0, (Node) null, Collections.emptyList(), Collections.emptyList())))));
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        }
    }

    @Test
    public void testSendRecordsTopicDescribeRetries() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andThrow(new RetriableException(new java.util.concurrent.TimeoutException("timeout")));
            expectTopicCreation("topic");
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertEquals(Arrays.asList(sourceRecord, sourceRecord2), Whitebox.getInternalState(this.workerTask, "toSend"));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertNull(Whitebox.getInternalState(this.workerTask, "toSend"));
        }
    }

    @Test
    public void testSendRecordsTopicCreateRetries() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andReturn(Collections.emptyMap());
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(EasyMock.newCapture())})).andThrow(new RetriableException(new java.util.concurrent.TimeoutException("timeout")));
            expectTopicCreation("topic");
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertEquals(Arrays.asList(sourceRecord, sourceRecord2), Whitebox.getInternalState(this.workerTask, "toSend"));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertNull(Whitebox.getInternalState(this.workerTask, "toSend"));
        }
    }

    @Test
    public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls(OTHER_TOPIC);
            expectTopicCreation("topic");
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            EasyMock.expect(this.admin.describeTopics(new String[]{OTHER_TOPIC})).andThrow(new RetriableException(new java.util.concurrent.TimeoutException("timeout")));
            expectTopicCreation(OTHER_TOPIC);
            expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders());
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertEquals(Arrays.asList(sourceRecord3), Whitebox.getInternalState(this.workerTask, "toSend"));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertNull(Whitebox.getInternalState(this.workerTask, "toSend"));
            PowerMock.verifyAll();
        }
    }

    @Test
    public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls(OTHER_TOPIC);
            expectTopicCreation("topic");
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            EasyMock.expect(this.admin.describeTopics(new String[]{OTHER_TOPIC})).andReturn(Collections.emptyMap());
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(EasyMock.newCapture())})).andThrow(new RetriableException(new java.util.concurrent.TimeoutException("timeout")));
            expectTopicCreation(OTHER_TOPIC);
            expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders());
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertEquals(Arrays.asList(sourceRecord3), Whitebox.getInternalState(this.workerTask, "toSend"));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            Assert.assertNull(Whitebox.getInternalState(this.workerTask, "toSend"));
            PowerMock.verifyAll();
        }
    }

    @Test
    public void testTopicDescribeFails() {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Assert.assertThrows(ConnectException.class, () -> {
                Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            });
        }
    }

    @Test
    public void testTopicCreateFails() {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andReturn(Collections.emptyMap());
            Capture newCapture = EasyMock.newCapture();
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(newCapture)})).andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Assert.assertThrows(ConnectException.class, () -> {
                Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            });
            Assert.assertTrue(newCapture.hasCaptured());
        }
    }

    @Test
    public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andReturn(Collections.emptyMap());
            Capture newCapture = EasyMock.newCapture();
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(newCapture)})).andReturn(TopicAdmin.EMPTY_CREATION);
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Assert.assertThrows(ConnectException.class, () -> {
                Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
            });
            Assert.assertTrue(newCapture.hasCaptured());
        }
    }

    @Test
    public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andReturn(Collections.emptyMap());
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(EasyMock.newCapture())})).andReturn(foundTopic("topic"));
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        }
    }

    @Test
    public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() throws Exception {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            expectPreliminaryCalls();
            EasyMock.expect(this.admin.describeTopics(new String[]{"topic"})).andReturn(Collections.emptyMap());
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(EasyMock.newCapture())})).andReturn(createdTopic("topic"));
            expectSendRecordTaskCommitRecordSucceed(false);
            expectSendRecordTaskCommitRecordSucceed(false);
            PowerMock.replayAll(new Object[0]);
            Whitebox.setInternalState(this.workerTask, "toSend", Arrays.asList(sourceRecord, sourceRecord2));
            Whitebox.invokeMethod(this.workerTask, "sendRecords", new Object[0]);
        }
    }

    private TopicAdmin.TopicCreationResponse createdTopic(String str) {
        return new TopicAdmin.TopicCreationResponse(Collections.singleton(str), Collections.emptySet());
    }

    private TopicAdmin.TopicCreationResponse foundTopic(String str) {
        return new TopicAdmin.TopicCreationResponse(Collections.emptySet(), Collections.singleton(str));
    }

    private void expectPreliminaryCalls() {
        expectPreliminaryCalls("topic");
    }

    private void expectPreliminaryCalls(String str) {
        expectConvertHeadersAndKeyValue(str, true, emptyHeaders());
        expectApplyTransformationChain(false);
    }

    private CountDownLatch expectEmptyPolls(int i, AtomicInteger atomicInteger) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        EasyMock.expect(this.sourceTask.poll()).andStubAnswer(() -> {
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            Thread.sleep(10L);
            return Collections.emptyList();
        });
        return countDownLatch;
    }

    private CountDownLatch expectPolls(int i, AtomicInteger atomicInteger) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        EasyMock.expect(this.sourceTask.poll()).andStubAnswer(() -> {
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            Thread.sleep(10L);
            return RECORDS;
        });
        expectSendRecordAnyTimes();
        return countDownLatch;
    }

    private CountDownLatch expectPolls(int i) throws InterruptedException {
        return expectPolls(i, new AtomicInteger());
    }

    private void expectSendRecordSyncFailure(Throwable th) {
        expectConvertHeadersAndKeyValue(false);
        expectApplyTransformationChain(false);
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(ProducerRecord.class), (Callback) EasyMock.anyObject(Callback.class))).andThrow(th);
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException {
        return expectSendRecordTaskCommitRecordSucceed(true);
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() throws InterruptedException {
        return expectSendRecordTaskCommitRecordSucceed(false);
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException {
        return expectSendRecord("topic", false, false, false, true, emptyHeaders());
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean z) throws InterruptedException {
        return expectSendRecord("topic", z, true, true, true, emptyHeaders());
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean z) throws InterruptedException {
        return expectSendRecord("topic", z, true, false, true, emptyHeaders());
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(String str, boolean z, boolean z2, boolean z3, boolean z4, Headers headers) throws InterruptedException {
        if (z4) {
            expectConvertHeadersAndKeyValue(str, z, headers);
        }
        expectApplyTransformationChain(z);
        Capture<ProducerRecord<byte[], byte[]>> newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.capture(newCapture), (Callback) EasyMock.capture(this.producerCallbacks)));
        IAnswer iAnswer = () -> {
            synchronized (this.producerCallbacks) {
                for (Callback callback : this.producerCallbacks.getValues()) {
                    if (z2) {
                        callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), (Exception) null);
                    } else {
                        callback.onCompletion((RecordMetadata) null, new TopicAuthorizationException("foo"));
                    }
                }
                this.producerCallbacks.reset();
            }
            return this.sendFuture;
        };
        if (z) {
            expect.andStubAnswer(iAnswer);
        } else {
            expect.andAnswer(iAnswer);
        }
        if (z2) {
            expectTaskCommitRecordWithOffset(z, z3);
            expectTaskGetTopic(z);
        }
        return newCapture;
    }

    private void expectConvertHeadersAndKeyValue(boolean z) {
        expectConvertHeadersAndKeyValue("topic", z, emptyHeaders());
    }

    private void expectConvertHeadersAndKeyValue(String str, boolean z, Headers headers) {
        Iterator it = headers.iterator();
        while (it.hasNext()) {
            Header header = (Header) it.next();
            IExpectationSetters expect = EasyMock.expect(this.headerConverter.fromConnectHeader(str, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
            if (z) {
                expect.andStubReturn(header.value());
            } else {
                expect.andReturn(header.value());
            }
        }
        IExpectationSetters expect2 = EasyMock.expect(this.keyConverter.fromConnectData(str, headers, KEY_SCHEMA, KEY));
        if (z) {
            expect2.andStubReturn(SERIALIZED_KEY);
        } else {
            expect2.andReturn(SERIALIZED_KEY);
        }
        IExpectationSetters expect3 = EasyMock.expect(this.valueConverter.fromConnectData(str, headers, RECORD_SCHEMA, RECORD));
        if (z) {
            expect3.andStubReturn(SERIALIZED_RECORD);
        } else {
            expect3.andReturn(SERIALIZED_RECORD);
        }
    }

    private void expectApplyTransformationChain(boolean z) {
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformationChain.apply((ConnectRecord) EasyMock.capture(newCapture)));
        if (z) {
            newCapture.getClass();
            expect.andStubAnswer(newCapture::getValue);
        } else {
            newCapture.getClass();
            expect.andAnswer(newCapture::getValue);
        }
    }

    private void expectTaskCommitRecordWithOffset(boolean z, boolean z2) throws InterruptedException {
        this.sourceTask.commitRecord((SourceRecord) EasyMock.anyObject(SourceRecord.class), (RecordMetadata) EasyMock.anyObject(RecordMetadata.class));
        IExpectationSetters expectLastCall = EasyMock.expectLastCall();
        if (!z2) {
            expectLastCall = expectLastCall.andThrow(new RuntimeException("Error committing record in source task"));
        }
        if (z) {
            expectLastCall.anyTimes();
        }
    }

    private void expectTaskGetTopic(boolean z) {
        Capture newCapture = EasyMock.newCapture();
        Capture newCapture2 = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.statusBackingStore.getTopic((String) EasyMock.capture(newCapture), (String) EasyMock.capture(newCapture2)));
        if (z) {
            expect.andStubAnswer(() -> {
                return new TopicStatus((String) newCapture2.getValue(), new ConnectorTaskId((String) newCapture.getValue(), 0), Time.SYSTEM.milliseconds());
            });
        } else {
            expect.andAnswer(() -> {
                return new TopicStatus((String) newCapture2.getValue(), new ConnectorTaskId((String) newCapture.getValue(), 0), Time.SYSTEM.milliseconds());
            });
        }
        if (newCapture.hasCaptured() && newCapture2.hasCaptured()) {
            Assert.assertEquals("job", newCapture.getValue());
            Assert.assertEquals("topic", newCapture2.getValue());
        }
    }

    private boolean awaitLatch(CountDownLatch countDownLatch) {
        try {
            return countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    private void expectOffsetFlush(boolean z) throws Exception {
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.beginFlush())).andReturn(true);
        Future future = (Future) PowerMock.createMock(Future.class);
        EasyMock.expect(this.offsetWriter.doFlush((org.apache.kafka.connect.util.Callback) EasyMock.anyObject(org.apache.kafka.connect.util.Callback.class))).andReturn(future);
        IExpectationSetters expect = EasyMock.expect(future.get(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject(TimeUnit.class)));
        if (z) {
            this.sourceTask.commit();
            EasyMock.expectLastCall();
            expect.andReturn((Object) null);
        } else {
            expect.andThrow(new java.util.concurrent.TimeoutException());
            this.offsetWriter.cancelFlush();
            PowerMock.expectLastCall();
        }
    }

    private void assertPollMetrics(int i) {
        ConnectMetrics.MetricGroup metricGroup = this.workerTask.sourceTaskMetricsGroup().metricGroup();
        ConnectMetrics.MetricGroup metricGroup2 = this.workerTask.taskMetricsGroup().metricGroup();
        double currentMetricValueAsDouble = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-rate");
        double currentMetricValueAsDouble2 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-total");
        if (i > 0) {
            Assert.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-max"), 1.0E-6d);
            Assert.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-avg"), 1.0E-6d);
            Assert.assertTrue(currentMetricValueAsDouble > 0.0d);
        } else {
            Assert.assertEquals(0.0d, currentMetricValueAsDouble, 0.0d);
        }
        Assert.assertTrue(currentMetricValueAsDouble2 >= ((double) i));
        double currentMetricValueAsDouble3 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-rate");
        double currentMetricValueAsDouble4 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-total");
        if (i > 0) {
            Assert.assertTrue(currentMetricValueAsDouble3 > 0.0d);
        } else {
            Assert.assertEquals(0.0d, currentMetricValueAsDouble3, 0.0d);
        }
        Assert.assertTrue(currentMetricValueAsDouble4 >= ((double) i));
        double currentMetricValueAsDouble5 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-max-time-ms");
        double currentMetricValueAsDouble6 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-avg-time-ms");
        if (i > 0) {
            Assert.assertTrue(currentMetricValueAsDouble5 >= 0.0d);
        }
        Assert.assertTrue(Double.isNaN(currentMetricValueAsDouble6) || currentMetricValueAsDouble6 > 0.0d);
        double currentMetricValueAsDouble7 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count");
        double currentMetricValueAsDouble8 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count-max");
        Assert.assertEquals(0.0d, currentMetricValueAsDouble7, 1.0E-6d);
        if (i > 0) {
            Assert.assertEquals(RECORDS.size(), currentMetricValueAsDouble8, 1.0E-6d);
        }
    }

    private RecordHeaders emptyHeaders() {
        return new RecordHeaders();
    }

    private void expectClose() {
        this.producer.close((Duration) EasyMock.anyObject(Duration.class));
        EasyMock.expectLastCall();
        this.admin.close((Duration) EasyMock.anyObject(Duration.class));
        EasyMock.expectLastCall();
        this.transformationChain.close();
        EasyMock.expectLastCall();
    }

    private void expectTopicCreation(String str) {
        if (this.config.topicCreationEnable()) {
            EasyMock.expect(this.admin.describeTopics(new String[]{str})).andReturn(Collections.emptyMap());
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(EasyMock.newCapture())})).andReturn(createdTopic(str));
        }
    }

    @Test
    @Confluent
    public void testTraceInvocation() throws Exception {
        createWorkerTask(TargetState.STARTED);
        this.workerTask.initialize(TASK_CONFIG);
        TracingContext tracingContext = (TracingContext) EasyMock.mock(TracingContext.class);
        tracingContext.incrementProcessedRecordCount();
        EasyMock.expectLastCall().once();
        Tracer tracer = (Tracer) EasyMock.mock(Tracer.class);
        this.workerTask.useTracer(tracer);
        EasyMock.expect(tracer.buildRecords()).andReturn(Collections.emptyList()).once();
        EasyMock.expect(tracer.tracingContext()).andReturn(tracingContext).once();
        EasyMock.expect(tracer.writeTraceRecords((List) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andAnswer(() -> {
            this.workerTask.stop();
            return null;
        }).once();
        this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        EasyMock.expectLastCall();
        this.sourceTask.start(TASK_PROPS);
        EasyMock.expectLastCall();
        this.sourceTask.commit();
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect(this.sourceTask.poll()).andReturn(RECORDS).times(1);
        EasyMock.replay(new Object[]{tracer, tracingContext, this.sourceTask});
        this.workerTask.execute();
        EasyMock.verify(new Object[]{tracer});
    }

    static {
        TASK_PROPS.put("task.class", TestSourceTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
        RECORDS = Arrays.asList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
    }
}
