package org.apache.kafka.connect.runtime;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.class */
public class WorkerSinkTaskThreadedTest {
    private static final int PARTITION = 12;
    private static final long FIRST_OFFSET = 45;
    private static final int KEY = 12;
    private static final String VALUE = "VALUE";
    private static final long TIMESTAMP = 42;
    private static final TaskConfig TASK_CONFIG;
    private Time time;
    private ConnectMetrics metrics;

    @Mock
    private SinkTask sinkTask;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<SinkRecord> transformationChain;
    private WorkerSinkTask workerTask;

    @Mock
    private KafkaConsumer<byte[], byte[]> consumer;

    @Mock
    private TaskStatus.Listener statusListener;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private ErrorHandlingMetrics errorHandlingMetrics;
    private long recordsReturned;
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
    private static final byte[] RAW_KEY = "key".getBytes();
    private static final byte[] RAW_VALUE = "value".getBytes();
    private static final String TOPIC = "test";
    private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, 12);
    private static final int PARTITION2 = 13;
    private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
    private static final int PARTITION3 = 14;
    private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
    private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
    private static final Set<TopicPartition> INITIAL_ASSIGNMENT = new HashSet(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
    private static final Map<String, String> TASK_PROPS = new HashMap();
    private static final TimestampType TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private final TargetState initialState = TargetState.STARTED;
    private final ArgumentCaptor<WorkerSinkTaskContext> sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class);
    private final ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    private final Function<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommitFn = l -> {
        long longValue = FIRST_OFFSET + l.longValue();
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(longValue));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
        return hashMap;
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest$ExpectOffsetCommitCommand.class */
    public static class ExpectOffsetCommitCommand {
        final long expectedMessages;
        final RuntimeException error;
        final Exception consumerCommitError;
        final long consumerCommitDelayMs;
        final boolean invokeCallback;

        private ExpectOffsetCommitCommand(long j, RuntimeException runtimeException, Exception exc, long j2, boolean z) {
            this.expectedMessages = j;
            this.error = runtimeException;
            this.consumerCommitError = exc;
            this.consumerCommitDelayMs = j2;
            this.invokeCallback = z;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest$TestSinkTask.class */
    private static abstract class TestSinkTask extends SinkTask {
        private TestSinkTask() {
        }
    }

    @Before
    public void setup() {
        this.time = new MockTime();
        this.metrics = new MockConnectMetrics();
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        this.workerTask = new WorkerSinkTask(this.taskId, this.sinkTask, this.statusListener, this.initialState, new StandaloneConfig(hashMap), ClusterConfigState.EMPTY, this.metrics, this.keyConverter, this.valueConverter, this.errorHandlingMetrics, this.headerConverter, this.transformationChain, this.consumer, this.pluginLoader, this.time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, (WorkerErrantRecordReporter) null, this.statusBackingStore, Collections::emptyList);
        this.recordsReturned = 0L;
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @Test
    public void testPollsInBackground() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(1L);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        for (int i = 0; i < 10; i++) {
            this.workerTask.iteration();
        }
        verifyTaskGetTopic(10);
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(11))).put((Collection) forClass.capture());
        Assert.assertEquals(11L, forClass.getAllValues().size());
        Assert.assertTrue(((Collection) forClass.getAllValues().get(0)).isEmpty());
        int i2 = 0;
        for (Collection collection : forClass.getAllValues().subList(1, forClass.getAllValues().size() - 1)) {
            Assert.assertEquals(1L, collection.size());
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(new InternalSinkRecord(new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + i2, (Object) null, (Object) null), new SinkRecord(TOPIC, 12, KEY_SCHEMA, 12, VALUE_SCHEMA, VALUE, FIRST_OFFSET + i2, Long.valueOf(TIMESTAMP), TIMESTAMP_TYPE)), (SinkRecord) it.next());
                i2++;
            }
        }
    }

    @Test
    public void testCommit() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(60000L);
        expectOffsetCommit(new ExpectOffsetCommitCommand(1L, null, null, 0L, true));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        Assert.assertEquals(0L, this.workerTask.commitFailures());
        this.workerTask.stop();
        this.workerTask.close();
        verifyTaskGetTopic(2);
        verifyStopTask();
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(3))).put((Collection) ArgumentCaptor.forClass(Collection.class).capture());
        Assert.assertEquals(3L, r0.getAllValues().size());
        verifyOffsetCommit(1L);
    }

    @Test
    public void testCommitFailure() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(60000L);
        expectOffsetCommit(new ExpectOffsetCommitCommand(1L, new RuntimeException(), null, 0L, true));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        Assert.assertEquals(1L, this.workerTask.commitFailures());
        Assert.assertFalse(this.workerTask.isCommitting());
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(2);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION, FIRST_OFFSET);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION2, FIRST_OFFSET);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION3, FIRST_OFFSET);
    }

    @Test
    public void testCommitSuccessFollowedByFailure() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(60000L);
        expectOffsetCommit(new ExpectOffsetCommitCommand(1L, null, null, 0L, true), new ExpectOffsetCommitCommand(2L, new RuntimeException(), null, 0L, true));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION, 46L);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION2, FIRST_OFFSET);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION3, FIRST_OFFSET);
        Assert.assertEquals(1L, this.workerTask.commitFailures());
        Assert.assertFalse(this.workerTask.isCommitting());
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(3);
    }

    @Test
    public void testCommitConsumerFailure() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(60000L);
        expectOffsetCommit(new ExpectOffsetCommitCommand(1L, null, new Exception(), 0L, true));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        Assert.assertEquals(1L, this.workerTask.commitFailures());
        Assert.assertFalse(this.workerTask.isCommitting());
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(2);
    }

    @Test
    public void testCommitTimeout() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(30000L);
        expectOffsetCommit(new ExpectOffsetCommitCommand(2L, null, null, 5000L, false));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        this.workerTask.iteration();
        this.workerTask.iteration();
        Assert.assertEquals(1L, this.workerTask.commitFailures());
        Assert.assertFalse(this.workerTask.isCommitting());
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(4);
    }

    @Test
    public void testAssignmentPauseResume() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(60000L);
        ((SinkTask) Mockito.doAnswer(invocationOnMock -> {
            return null;
        }).doAnswer(invocationOnMock2 -> {
            Assert.assertEquals(new HashSet(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)), ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).assignment());
            return null;
        }).doAnswer(invocationOnMock3 -> {
            try {
                ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).pause(new TopicPartition[]{UNASSIGNED_TOPIC_PARTITION});
                Assert.fail("Trying to pause unassigned partition should have thrown an Connect exception");
            } catch (ConnectException e) {
            }
            ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).pause(new TopicPartition[]{TOPIC_PARTITION, TOPIC_PARTITION2});
            return null;
        }).doAnswer(invocationOnMock4 -> {
            try {
                ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).resume(new TopicPartition[]{UNASSIGNED_TOPIC_PARTITION});
                Assert.fail("Trying to resume unassigned partition should have thrown an Connect exception");
            } catch (ConnectException e) {
            }
            ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).resume(new TopicPartition[]{TOPIC_PARTITION, TOPIC_PARTITION2});
            return null;
        }).when(this.sinkTask)).put((Collection) ArgumentMatchers.any(Collection.class));
        ((KafkaConsumer) Mockito.doThrow(new Throwable[]{new IllegalStateException("unassigned topic partition")}).when(this.consumer)).pause(Collections.singletonList(UNASSIGNED_TOPIC_PARTITION));
        ((KafkaConsumer) Mockito.doAnswer(invocationOnMock5 -> {
            return null;
        }).when(this.consumer)).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
        ((KafkaConsumer) Mockito.doThrow(new Throwable[]{new IllegalStateException("unassigned topic partition")}).when(this.consumer)).resume(Collections.singletonList(UNASSIGNED_TOPIC_PARTITION));
        ((KafkaConsumer) Mockito.doAnswer(invocationOnMock6 -> {
            return null;
        }).when(this.consumer)).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        this.workerTask.iteration();
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(3);
        ((KafkaConsumer) Mockito.verify(this.consumer, Mockito.atLeastOnce())).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
        ((KafkaConsumer) Mockito.verify(this.consumer, Mockito.atLeastOnce())).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
    }

    @Test
    public void testRewind() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectPolls(60000L);
        HashMap hashMap = new HashMap();
        ((SinkTask) Mockito.doAnswer(invocationOnMock -> {
            return null;
        }).doAnswer(invocationOnMock2 -> {
            hashMap.put(TOPIC_PARTITION, 40L);
            ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).offset(hashMap);
            return null;
        }).doAnswer(invocationOnMock3 -> {
            Assert.assertEquals(0L, ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).offsets().size());
            return null;
        }).when(this.sinkTask)).put((Collection) ArgumentMatchers.any(Collection.class));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION, 40L);
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(2);
    }

    @Test
    public void testRewindOnRebalanceDuringPoll() {
        expectTaskGetTopic();
        expectInitialAssignment();
        expectRebalanceDuringPoll(40L);
        ((SinkTask) Mockito.doAnswer(invocationOnMock -> {
            return null;
        }).doAnswer(invocationOnMock2 -> {
            Assert.assertEquals(0L, ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).offsets().size());
            return null;
        }).when(this.sinkTask)).put((Collection) ArgumentMatchers.any(Collection.class));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyInitialAssignment();
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION, 40L);
        this.workerTask.stop();
        this.workerTask.close();
        verifyStopTask();
        verifyTaskGetTopic(1);
    }

    private void verifyInitializeTask() {
        ((KafkaConsumer) Mockito.verify(this.consumer)).subscribe((Collection) ArgumentMatchers.eq(Collections.singletonList(TOPIC)), (ConsumerRebalanceListener) this.rebalanceListener.capture());
        ((SinkTask) Mockito.verify(this.sinkTask)).initialize((SinkTaskContext) this.sinkTaskContext.capture());
        ((SinkTask) Mockito.verify(this.sinkTask)).start(TASK_PROPS);
    }

    private void expectInitialAssignment() {
        Mockito.when(this.consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            Mockito.when(Long.valueOf(this.consumer.position(topicPartition))).thenReturn(Long.valueOf(FIRST_OFFSET));
        });
    }

    private void verifyInitialAssignment() {
        ((SinkTask) Mockito.verify(this.sinkTask)).open(INITIAL_ASSIGNMENT);
        ((SinkTask) Mockito.verify(this.sinkTask)).put(Collections.emptyList());
    }

    private void verifyStopTask() {
        ((SinkTask) Mockito.verify(this.sinkTask)).stop();
        ((KafkaConsumer) Mockito.verify(this.consumer)).wakeup();
        ((KafkaConsumer) Mockito.verify(this.consumer)).close();
        try {
            ((HeaderConverter) Mockito.verify(this.headerConverter)).close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void expectPolls(long j) {
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        }).thenAnswer(invocationOnMock2 -> {
            this.time.sleep(j);
            ConsumerRecords consumerRecords = new ConsumerRecords(Collections.singletonMap(new TopicPartition(TOPIC, 12), Collections.singletonList(new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + this.recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty()))));
            this.recordsReturned++;
            return consumerRecords;
        });
        Mockito.when(this.keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, 12));
        Mockito.when(this.valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
        Mockito.when(this.transformationChain.apply((ConnectRecord) ArgumentMatchers.any(SinkRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg());
    }

    private void expectRebalanceDuringPoll(long j) {
        List asList = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3);
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, Long.valueOf(j));
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        }).thenAnswer(invocationOnMock2 -> {
            this.time.sleep(1L);
            ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).offset(hashMap);
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(asList);
            ConsumerRecords consumerRecords = new ConsumerRecords(Collections.singletonMap(new TopicPartition(TOPIC, 12), Collections.singletonList(new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + this.recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty()))));
            this.recordsReturned++;
            return consumerRecords;
        });
        Mockito.when(this.keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, 12));
        Mockito.when(this.valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
    }

    private void expectOffsetCommit(final ExpectOffsetCommitCommand... expectOffsetCommitCommandArr) {
        ((SinkTask) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest.1
            int index = 0;

            public Object answer(InvocationOnMock invocationOnMock) {
                ExpectOffsetCommitCommand[] expectOffsetCommitCommandArr2 = expectOffsetCommitCommandArr;
                int i = this.index;
                this.index = i + 1;
                ExpectOffsetCommitCommand expectOffsetCommitCommand = expectOffsetCommitCommandArr2[i];
                Map map = (Map) WorkerSinkTaskThreadedTest.this.offsetsToCommitFn.apply(Long.valueOf(expectOffsetCommitCommand.expectedMessages));
                if (expectOffsetCommitCommand.error != null) {
                    throw expectOffsetCommitCommand.error;
                }
                return map;
            }
        }).when(this.sinkTask)).preCommit(ArgumentMatchers.anyMap());
        ((KafkaConsumer) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest.2
            int index = 0;

            public Object answer(InvocationOnMock invocationOnMock) {
                ExpectOffsetCommitCommand[] expectOffsetCommitCommandArr2 = expectOffsetCommitCommandArr;
                int i = this.index;
                this.index = i + 1;
                ExpectOffsetCommitCommand expectOffsetCommitCommand = expectOffsetCommitCommandArr2[i];
                WorkerSinkTaskThreadedTest.this.time.sleep(expectOffsetCommitCommand.consumerCommitDelayMs);
                if (!expectOffsetCommitCommand.invokeCallback) {
                    return null;
                }
                ((OffsetCommitCallback) invocationOnMock.getArgument(1)).onComplete((Map) WorkerSinkTaskThreadedTest.this.offsetsToCommitFn.apply(Long.valueOf(expectOffsetCommitCommand.expectedMessages)), expectOffsetCommitCommand.consumerCommitError);
                return null;
            }
        }).when(this.consumer)).commitAsync(ArgumentMatchers.anyMap(), (OffsetCommitCallback) ArgumentMatchers.any(OffsetCommitCallback.class));
    }

    private void verifyOffsetCommit(long j) {
        long j2 = FIRST_OFFSET + j;
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(j2));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
        ((SinkTask) Mockito.verify(this.sinkTask)).preCommit(hashMap);
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap), (OffsetCommitCallback) ArgumentMatchers.any(OffsetCommitCallback.class));
    }

    private void expectTaskGetTopic() {
        Mockito.when(this.statusBackingStore.getTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            return new TopicStatus((String) invocationOnMock.getArgument(1, String.class), new ConnectorTaskId((String) invocationOnMock.getArgument(0, String.class), 0), Time.SYSTEM.milliseconds());
        });
    }

    private void verifyTaskGetTopic(int i) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(String.class);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore, Mockito.times(i))).getTopic((String) forClass.capture(), (String) forClass2.capture());
        Assert.assertEquals("job", forClass.getValue());
        Assert.assertEquals(TOPIC, forClass2.getValue());
    }

    private RecordHeaders emptyHeaders() {
        return new RecordHeaders();
    }

    static {
        TASK_PROPS.put("topics", TOPIC);
        TASK_PROPS.put("task.class", TestSinkTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
    }
}
