/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class FlinkKafkaConsumerBaseTest {
    @Test
    public void testEitherWatermarkExtractor() {
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks(null);
            Assert.fail();
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks(null);
            Assert.fail();
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        AssignerWithPeriodicWatermarks periodicAssigner = (AssignerWithPeriodicWatermarks)Mockito.mock(AssignerWithPeriodicWatermarks.class);
        AssignerWithPunctuatedWatermarks punctuatedAssigner = (AssignerWithPunctuatedWatermarks)Mockito.mock(AssignerWithPunctuatedWatermarks.class);
        DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer();
        c1.assignTimestampsAndWatermarks(periodicAssigner);
        try {
            c1.assignTimestampsAndWatermarks(punctuatedAssigner);
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        DummyFlinkKafkaConsumer c2 = new DummyFlinkKafkaConsumer();
        c2.assignTimestampsAndWatermarks(punctuatedAssigner);
        try {
            c2.assignTimestampsAndWatermarks(periodicAssigner);
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void ignoreCheckpointWhenNotRunning() throws Exception {
        AbstractFetcher fetcher = (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
        FlinkKafkaConsumerBase consumer = FlinkKafkaConsumerBaseTest.getConsumer(fetcher, new LinkedMap(), false);
        OperatorStateStore operatorStateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        TestingListState listState = new TestingListState();
        Mockito.when((Object)operatorStateStore.getListState((ListStateDescriptor)Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(1L, 1L));
        Assert.assertFalse((boolean)listState.get().iterator().hasNext());
        consumer.notifyCheckpointComplete(66L);
    }

    @Test
    public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
        OperatorStateStore operatorStateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        TestingListState<Tuple2> listState = new TestingListState<Tuple2>();
        listState.add(Tuple2.of((Object)new KafkaTopicPartition("abc", 13), (Object)16768L));
        listState.add(Tuple2.of((Object)new KafkaTopicPartition("def", 7), (Object)987654321L));
        FlinkKafkaConsumerBase consumer = FlinkKafkaConsumerBaseTest.getConsumer(null, new LinkedMap(), true);
        Mockito.when((Object)operatorStateStore.getSerializableListState((String)Matchers.any(String.class))).thenReturn(listState);
        StateInitializationContext initializationContext = (StateInitializationContext)Mockito.mock(StateInitializationContext.class);
        Mockito.when((Object)initializationContext.getOperatorStateStore()).thenReturn((Object)operatorStateStore);
        Mockito.when((Object)initializationContext.isRestored()).thenReturn((Object)true);
        consumer.initializeState((FunctionInitializationContext)initializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(17L, 17L));
        Assert.assertTrue((boolean)listState.isClearCalled());
        HashSet<Serializable> expected = new HashSet<Serializable>();
        Iterator i$ = listState.get().iterator();
        while (i$.hasNext()) {
            Serializable serializable = (Serializable)i$.next();
            expected.add(serializable);
        }
        int counter = 0;
        Iterator i$2 = listState.get().iterator();
        while (i$2.hasNext()) {
            Serializable serializable = (Serializable)i$2.next();
            Assert.assertTrue((boolean)expected.contains(serializable));
            ++counter;
        }
        Assert.assertEquals((long)expected.size(), (long)counter);
    }

    @Test
    public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
        FlinkKafkaConsumerBase consumer = FlinkKafkaConsumerBaseTest.getConsumer(null, new LinkedMap(), true);
        OperatorStateStore operatorStateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        TestingListState listState = new TestingListState();
        Mockito.when((Object)operatorStateStore.getSerializableListState((String)Matchers.any(String.class))).thenReturn(listState);
        StateInitializationContext initializationContext = (StateInitializationContext)Mockito.mock(StateInitializationContext.class);
        Mockito.when((Object)initializationContext.getOperatorStateStore()).thenReturn((Object)operatorStateStore);
        Mockito.when((Object)initializationContext.isRestored()).thenReturn((Object)false);
        consumer.initializeState((FunctionInitializationContext)initializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(17L, 17L));
        Assert.assertFalse((boolean)listState.get().iterator().hasNext());
    }

    @Test
    public void checkUseFetcherWhenNoCheckpoint() throws Exception {
        FlinkKafkaConsumerBase consumer = FlinkKafkaConsumerBaseTest.getConsumer(null, new LinkedMap(), true);
        ArrayList<KafkaTopicPartition> partitionList = new ArrayList<KafkaTopicPartition>(1);
        partitionList.add(new KafkaTopicPartition("test", 0));
        consumer.setSubscribedPartitions(partitionList);
        OperatorStateStore operatorStateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        TestingListState listState = new TestingListState();
        Mockito.when((Object)operatorStateStore.getSerializableListState((String)Matchers.any(String.class))).thenReturn(listState);
        StateInitializationContext initializationContext = (StateInitializationContext)Mockito.mock(StateInitializationContext.class);
        Mockito.when((Object)initializationContext.getOperatorStateStore()).thenReturn((Object)operatorStateStore);
        Mockito.when((Object)initializationContext.isRestored()).thenReturn((Object)false);
        consumer.initializeState((FunctionInitializationContext)initializationContext);
        consumer.run((SourceFunction.SourceContext)Mockito.mock(SourceFunction.SourceContext.class));
    }

    @Test
    public void testConfigureOnCheckpointsCommitMode() {
        DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
        consumer.setIsAutoCommitEnabled(true);
        StreamingRuntimeContext context = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)context.getIndexOfThisSubtask()).thenReturn((Object)0);
        Mockito.when((Object)context.getNumberOfParallelSubtasks()).thenReturn((Object)1);
        Mockito.when((Object)context.isCheckpointingEnabled()).thenReturn((Object)true);
        consumer.setRuntimeContext((RuntimeContext)context);
        consumer.open(new Configuration());
        Assert.assertEquals((Object)OffsetCommitMode.ON_CHECKPOINTS, (Object)consumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureAutoCommitMode() {
        DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
        consumer.setIsAutoCommitEnabled(true);
        StreamingRuntimeContext context = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)context.getIndexOfThisSubtask()).thenReturn((Object)0);
        Mockito.when((Object)context.getNumberOfParallelSubtasks()).thenReturn((Object)1);
        Mockito.when((Object)context.isCheckpointingEnabled()).thenReturn((Object)false);
        consumer.setRuntimeContext((RuntimeContext)context);
        consumer.open(new Configuration());
        Assert.assertEquals((Object)OffsetCommitMode.KAFKA_PERIODIC, (Object)consumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureDisableOffsetCommitWithCheckpointing() {
        DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
        consumer.setIsAutoCommitEnabled(true);
        StreamingRuntimeContext context = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)context.getIndexOfThisSubtask()).thenReturn((Object)0);
        Mockito.when((Object)context.getNumberOfParallelSubtasks()).thenReturn((Object)1);
        Mockito.when((Object)context.isCheckpointingEnabled()).thenReturn((Object)true);
        consumer.setRuntimeContext((RuntimeContext)context);
        consumer.setCommitOffsetsOnCheckpoints(false);
        consumer.open(new Configuration());
        Assert.assertEquals((Object)OffsetCommitMode.DISABLED, (Object)consumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
        DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
        consumer.setIsAutoCommitEnabled(false);
        StreamingRuntimeContext context = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)context.getIndexOfThisSubtask()).thenReturn((Object)0);
        Mockito.when((Object)context.getNumberOfParallelSubtasks()).thenReturn((Object)1);
        Mockito.when((Object)context.isCheckpointingEnabled()).thenReturn((Object)false);
        consumer.setRuntimeContext((RuntimeContext)context);
        consumer.open(new Configuration());
        Assert.assertEquals((Object)OffsetCommitMode.DISABLED, (Object)consumer.getOffsetCommitMode());
    }

    @Test
    public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
        HashMap<KafkaTopicPartition, Long> state1 = new HashMap<KafkaTopicPartition, Long>();
        state1.put(new KafkaTopicPartition("abc", 13), 16768L);
        state1.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap<KafkaTopicPartition, Long> state2 = new HashMap<KafkaTopicPartition, Long>();
        state2.put(new KafkaTopicPartition("abc", 13), 16770L);
        state2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap<KafkaTopicPartition, Long> state3 = new HashMap<KafkaTopicPartition, Long>();
        state3.put(new KafkaTopicPartition("abc", 13), 16780L);
        state3.put(new KafkaTopicPartition("def", 7), 987654377L);
        AbstractFetcher fetcher = (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
        Mockito.when((Object)fetcher.snapshotCurrentState()).thenReturn(state1, (Object[])new HashMap[]{state2, state3});
        LinkedMap pendingOffsetsToCommit = new LinkedMap();
        FlinkKafkaConsumerBase consumer = FlinkKafkaConsumerBaseTest.getConsumer(fetcher, pendingOffsetsToCommit, true);
        StreamingRuntimeContext mockRuntimeContext = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)mockRuntimeContext.isCheckpointingEnabled()).thenReturn((Object)true);
        consumer.setRuntimeContext((RuntimeContext)mockRuntimeContext);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        OperatorStateStore backend = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        TestingListState listState = new TestingListState();
        Mockito.when((Object)backend.getSerializableListState((String)Matchers.any(String.class))).thenReturn(listState);
        StateInitializationContext initializationContext = (StateInitializationContext)Mockito.mock(StateInitializationContext.class);
        Mockito.when((Object)initializationContext.getOperatorStateStore()).thenReturn((Object)backend);
        Mockito.when((Object)initializationContext.isRestored()).thenReturn((Object)false, (Object[])new Boolean[]{true, true, true});
        consumer.initializeState((FunctionInitializationContext)initializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(138L, 138L));
        HashMap<Object, Object> snapshot1 = new HashMap<Object, Object>();
        Iterator i$ = listState.get().iterator();
        while (i$.hasNext()) {
            Serializable serializable = (Serializable)i$.next();
            Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2)serializable;
            snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
        }
        Assert.assertEquals(state1, snapshot1);
        Assert.assertEquals((long)1L, (long)pendingOffsetsToCommit.size());
        Assert.assertEquals(state1, (Object)pendingOffsetsToCommit.get((Object)138L));
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(140L, 140L));
        HashMap<Object, Object> snapshot2 = new HashMap<Object, Object>();
        Iterator i$2 = listState.get().iterator();
        while (i$2.hasNext()) {
            Serializable serializable = (Serializable)i$2.next();
            Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2)serializable;
            snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
        }
        Assert.assertEquals(state2, snapshot2);
        Assert.assertEquals((long)2L, (long)pendingOffsetsToCommit.size());
        Assert.assertEquals(state2, (Object)pendingOffsetsToCommit.get((Object)140L));
        consumer.notifyCheckpointComplete(138L);
        Assert.assertEquals((long)1L, (long)pendingOffsetsToCommit.size());
        Assert.assertTrue((boolean)pendingOffsetsToCommit.containsKey((Object)140L));
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap<Object, Object> snapshot3 = new HashMap<Object, Object>();
        Iterator i$3 = listState.get().iterator();
        while (i$3.hasNext()) {
            Serializable serializable = (Serializable)i$3.next();
            Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2)serializable;
            snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
        }
        Assert.assertEquals(state3, snapshot3);
        Assert.assertEquals((long)2L, (long)pendingOffsetsToCommit.size());
        Assert.assertEquals(state3, (Object)pendingOffsetsToCommit.get((Object)141L));
        consumer.notifyCheckpointComplete(141L);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        consumer.notifyCheckpointComplete(666L);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        OperatorStateStore operatorStateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        listState = new TestingListState();
        Mockito.when((Object)operatorStateStore.getListState((ListStateDescriptor)Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
        for (int i = 100; i < 600; ++i) {
            consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl((long)i, (long)i));
            listState.clear();
        }
        Assert.assertEquals((long)100L, (long)pendingOffsetsToCommit.size());
        consumer.notifyCheckpointComplete(598L);
        Assert.assertEquals((long)1L, (long)pendingOffsetsToCommit.size());
        consumer.notifyCheckpointComplete(590L);
        consumer.notifyCheckpointComplete(599L);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
    }

    @Test
    public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception {
        HashMap<KafkaTopicPartition, Long> state1 = new HashMap<KafkaTopicPartition, Long>();
        state1.put(new KafkaTopicPartition("abc", 13), 16768L);
        state1.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap<KafkaTopicPartition, Long> state2 = new HashMap<KafkaTopicPartition, Long>();
        state2.put(new KafkaTopicPartition("abc", 13), 16770L);
        state2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap<KafkaTopicPartition, Long> state3 = new HashMap<KafkaTopicPartition, Long>();
        state3.put(new KafkaTopicPartition("abc", 13), 16780L);
        state3.put(new KafkaTopicPartition("def", 7), 987654377L);
        AbstractFetcher fetcher = (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
        Mockito.when((Object)fetcher.snapshotCurrentState()).thenReturn(state1, (Object[])new HashMap[]{state2, state3});
        LinkedMap pendingOffsetsToCommit = new LinkedMap();
        FlinkKafkaConsumerBase consumer = FlinkKafkaConsumerBaseTest.getConsumer(fetcher, pendingOffsetsToCommit, true);
        StreamingRuntimeContext mockRuntimeContext = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)mockRuntimeContext.isCheckpointingEnabled()).thenReturn((Object)true);
        consumer.setRuntimeContext((RuntimeContext)mockRuntimeContext);
        consumer.setCommitOffsetsOnCheckpoints(false);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        OperatorStateStore backend = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        TestingListState listState = new TestingListState();
        Mockito.when((Object)backend.getSerializableListState((String)Matchers.any(String.class))).thenReturn(listState);
        StateInitializationContext initializationContext = (StateInitializationContext)Mockito.mock(StateInitializationContext.class);
        Mockito.when((Object)initializationContext.getOperatorStateStore()).thenReturn((Object)backend);
        Mockito.when((Object)initializationContext.isRestored()).thenReturn((Object)false, (Object[])new Boolean[]{true, true, true});
        consumer.initializeState((FunctionInitializationContext)initializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(138L, 138L));
        HashMap<Object, Object> snapshot1 = new HashMap<Object, Object>();
        Iterator i$ = listState.get().iterator();
        while (i$.hasNext()) {
            Serializable serializable = (Serializable)i$.next();
            Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2)serializable;
            snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
        }
        Assert.assertEquals(state1, snapshot1);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(140L, 140L));
        HashMap<Object, Object> snapshot2 = new HashMap<Object, Object>();
        Iterator i$2 = listState.get().iterator();
        while (i$2.hasNext()) {
            Serializable serializable = (Serializable)i$2.next();
            Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2)serializable;
            snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
        }
        Assert.assertEquals(state2, snapshot2);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        consumer.notifyCheckpointComplete(138L);
        ((AbstractFetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap());
        consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap<Object, Object> snapshot3 = new HashMap<Object, Object>();
        Iterator i$3 = listState.get().iterator();
        while (i$3.hasNext()) {
            Serializable serializable = (Serializable)i$3.next();
            Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2)serializable;
            snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
        }
        Assert.assertEquals(state3, snapshot3);
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        consumer.notifyCheckpointComplete(141L);
        ((AbstractFetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap());
        consumer.notifyCheckpointComplete(666L);
        ((AbstractFetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap());
        OperatorStateStore operatorStateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        listState = new TestingListState();
        Mockito.when((Object)operatorStateStore.getListState((ListStateDescriptor)Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
        for (int i = 100; i < 600; ++i) {
            consumer.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl((long)i, (long)i));
            listState.clear();
        }
        Assert.assertEquals((long)0L, (long)pendingOffsetsToCommit.size());
        consumer.notifyCheckpointComplete(598L);
        ((AbstractFetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap());
        consumer.notifyCheckpointComplete(590L);
        ((AbstractFetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap());
        consumer.notifyCheckpointComplete(599L);
        ((AbstractFetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap());
    }

    private static <T> FlinkKafkaConsumerBase<T> getConsumer(AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception {
        DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
        StreamingRuntimeContext mockRuntimeContext = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when((Object)mockRuntimeContext.isCheckpointingEnabled()).thenReturn((Object)true);
        consumer.setRuntimeContext((RuntimeContext)mockRuntimeContext);
        Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
        fetcherField.setAccessible(true);
        fetcherField.set((Object)consumer, fetcher);
        Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
        mapField.setAccessible(true);
        mapField.set((Object)consumer, pendingOffsetsToCommit);
        Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
        runningField.setAccessible(true);
        runningField.set((Object)consumer, running);
        return consumer;
    }

    private static final class TestingListState<T>
    implements ListState<T> {
        private final List<T> list = new ArrayList<T>();
        private boolean clearCalled = false;

        private TestingListState() {
        }

        public void clear() {
            this.list.clear();
            this.clearCalled = true;
        }

        public Iterable<T> get() throws Exception {
            return this.list;
        }

        public void add(T value) throws Exception {
            this.list.add(value);
        }

        public List<T> getList() {
            return this.list;
        }

        public boolean isClearCalled() {
            return this.clearCalled;
        }
    }

    private static class DummyFlinkKafkaConsumer<T>
    extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1L;
        boolean isAutoCommitEnabled = false;

        public DummyFlinkKafkaConsumer() {
            super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema)Mockito.mock(KeyedDeserializationSchema.class));
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
            return (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
        }

        protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
            return Collections.emptyList();
        }

        protected boolean getIsAutoCommitEnabled() {
            return this.isAutoCommitEnabled;
        }

        public void setIsAutoCommitEnabled(boolean isAutoCommitEnabled) {
            this.isAutoCommitEnabled = isAutoCommitEnabled;
        }
    }
}

