package org.apache.flink.streaming.connectors.kafka;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.class */
public class FlinkKafkaConsumerBaseTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$DummyFlinkKafkaConsumer.class */
    public static final class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1;

        public DummyFlinkKafkaConsumer() {
            super((KeyedDeserializationSchema) Mockito.mock(KeyedDeserializationSchema.class));
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext) throws Exception {
            return null;
        }
    }

    @Test
    public void testEitherWatermarkExtractor() {
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
            Assert.fail();
        } catch (NullPointerException e) {
        }
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
            Assert.fail();
        } catch (NullPointerException e2) {
        }
        AssignerWithPeriodicWatermarks assignerWithPeriodicWatermarks = (AssignerWithPeriodicWatermarks) Mockito.mock(AssignerWithPeriodicWatermarks.class);
        AssignerWithPunctuatedWatermarks assignerWithPunctuatedWatermarks = (AssignerWithPunctuatedWatermarks) Mockito.mock(AssignerWithPunctuatedWatermarks.class);
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.assignTimestampsAndWatermarks(assignerWithPeriodicWatermarks);
        try {
            dummyFlinkKafkaConsumer.assignTimestampsAndWatermarks(assignerWithPunctuatedWatermarks);
            Assert.fail();
        } catch (IllegalStateException e3) {
        }
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer2 = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer2.assignTimestampsAndWatermarks(assignerWithPunctuatedWatermarks);
        try {
            dummyFlinkKafkaConsumer2.assignTimestampsAndWatermarks(assignerWithPeriodicWatermarks);
            Assert.fail();
        } catch (IllegalStateException e4) {
        }
    }

    @Test
    public void ignoreCheckpointWhenNotRunning() throws Exception {
        FlinkKafkaConsumerBase consumer = getConsumer((AbstractFetcher) Mockito.mock(AbstractFetcher.class), new LinkedMap(), false);
        Assert.assertNull(consumer.snapshotState(17L, 23L));
        consumer.notifyCheckpointComplete(66L);
    }

    @Test
    public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true);
        consumer.restoreState(hashMap);
        Assert.assertEquals(hashMap, consumer.snapshotState(17L, 23L));
    }

    @Test
    public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
        Assert.assertNull(getConsumer(null, new LinkedMap(), true).snapshotState(17L, 23L));
    }

    @Test
    public void testSnapshotState() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition("abc", 13), 16770L);
        hashMap2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap hashMap3 = new HashMap();
        hashMap2.put(new KafkaTopicPartition("abc", 13), 16780L);
        hashMap2.put(new KafkaTopicPartition("def", 7), 987654377L);
        AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        Mockito.when(abstractFetcher.snapshotCurrentState()).thenReturn(hashMap, new HashMap[]{hashMap2, hashMap3});
        LinkedMap linkedMap = new LinkedMap();
        FlinkKafkaConsumerBase consumer = getConsumer(abstractFetcher, linkedMap, true);
        Assert.assertEquals(0L, linkedMap.size());
        Assert.assertEquals(hashMap, consumer.snapshotState(138L, 19L));
        Assert.assertEquals(1L, linkedMap.size());
        Assert.assertEquals(hashMap, linkedMap.get(138L));
        Assert.assertEquals(hashMap2, consumer.snapshotState(140L, 1578L));
        Assert.assertEquals(2L, linkedMap.size());
        Assert.assertEquals(hashMap2, linkedMap.get(140L));
        consumer.notifyCheckpointComplete(138L);
        Assert.assertEquals(1L, linkedMap.size());
        Assert.assertTrue(linkedMap.containsKey(140L));
        Assert.assertEquals(hashMap3, consumer.snapshotState(141L, 1578L));
        Assert.assertEquals(2L, linkedMap.size());
        Assert.assertEquals(hashMap3, linkedMap.get(141L));
        consumer.notifyCheckpointComplete(141L);
        Assert.assertEquals(0L, linkedMap.size());
        consumer.notifyCheckpointComplete(666L);
        Assert.assertEquals(0L, linkedMap.size());
        for (int i = 100; i < 600; i++) {
            consumer.snapshotState(i, 15 * i);
        }
        Assert.assertEquals(100L, linkedMap.size());
        consumer.notifyCheckpointComplete(598L);
        Assert.assertEquals(1L, linkedMap.size());
        consumer.notifyCheckpointComplete(590L);
        consumer.notifyCheckpointComplete(599L);
        Assert.assertEquals(0L, linkedMap.size());
    }

    private static <T> FlinkKafkaConsumerBase<T> getConsumer(AbstractFetcher<T, ?> abstractFetcher, LinkedMap linkedMap, boolean z) throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        Field declaredField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
        declaredField.setAccessible(true);
        declaredField.set(dummyFlinkKafkaConsumer, abstractFetcher);
        Field declaredField2 = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
        declaredField2.setAccessible(true);
        declaredField2.set(dummyFlinkKafkaConsumer, linkedMap);
        Field declaredField3 = FlinkKafkaConsumerBase.class.getDeclaredField("running");
        declaredField3.setAccessible(true);
        declaredField3.set(dummyFlinkKafkaConsumer, Boolean.valueOf(z));
        return dummyFlinkKafkaConsumer;
    }
}
