package org.apache.flink.streaming.connectors.kafka.internals;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.class */
public class AbstractFetcherWatermarksTest {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PeriodicTestExtractor.class */
    private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
        private volatile long maxTimestamp;

        private PeriodicTestExtractor() {
            this.maxTimestamp = Long.MIN_VALUE;
        }

        public long extractTimestamp(Long l, long j) {
            this.maxTimestamp = Math.max(this.maxTimestamp, l.longValue());
            return l.longValue();
        }

        @Nullable
        public Watermark getCurrentWatermark() {
            return new Watermark(this.maxTimestamp);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PeriodicTestWatermarkGenerator.class */
    public static class PeriodicTestWatermarkGenerator implements WatermarkGenerator<Long> {
        private volatile long maxTimestamp;

        private PeriodicTestWatermarkGenerator() {
            this.maxTimestamp = Long.MIN_VALUE;
        }

        public void onEvent(Long l, long j, WatermarkOutput watermarkOutput) {
            this.maxTimestamp = Math.max(this.maxTimestamp, l.longValue());
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(this.maxTimestamp));
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PeriodicWatermarksSuite.class */
    public static class PeriodicWatermarksSuite {

        @Parameterized.Parameter
        public WatermarkStrategy<Long> testWmStrategy;

        @Parameterized.Parameters
        public static Collection<WatermarkStrategy<Long>> getParams() {
            return Arrays.asList(new AssignerWithPeriodicWatermarksAdapter.Strategy(new PeriodicTestExtractor()), WatermarkStrategy.forGenerator(context -> {
                return new PeriodicTestWatermarkGenerator();
            }).withTimestampAssigner((l, j) -> {
                return l.longValue();
            }));
        }

        @Test
        public void testPeriodicWatermarks() throws Exception {
            HashMap hashMap = new HashMap();
            hashMap.put(new KafkaTopicPartition("test topic name", 7), -915623761774L);
            hashMap.put(new KafkaTopicPartition("test topic name", 13), -915623761774L);
            hashMap.put(new KafkaTopicPartition("test topic name", 21), -915623761774L);
            TestSourceContext testSourceContext = new TestSourceContext();
            TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
            TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, new SerializedValue(this.testWmStrategy), testProcessingTimeService, 10L);
            KafkaTopicPartitionState kafkaTopicPartitionState = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(0);
            KafkaTopicPartitionState kafkaTopicPartitionState2 = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(1);
            KafkaTopicPartitionState kafkaTopicPartitionState3 = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(2);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1L, kafkaTopicPartitionState, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1L, kafkaTopicPartitionState, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 2L, kafkaTopicPartitionState, 2L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 3L, kafkaTopicPartitionState, 3L);
            Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 12L, kafkaTopicPartitionState2, 1L);
            Assert.assertEquals(12L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(12L, testSourceContext.getLatestElement().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 101L, kafkaTopicPartitionState3, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 102L, kafkaTopicPartitionState3, 2L);
            Assert.assertEquals(102L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(102L, testSourceContext.getLatestElement().getTimestamp());
            testProcessingTimeService.setCurrentTime(10L);
            Assert.assertEquals(3L, testSourceContext.getLatestWatermark().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1003L, kafkaTopicPartitionState3, 3L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1004L, kafkaTopicPartitionState3, 4L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1005L, kafkaTopicPartitionState3, 5L);
            Assert.assertEquals(1005L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(1005L, testSourceContext.getLatestElement().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 30L, kafkaTopicPartitionState, 4L);
            Assert.assertEquals(30L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(30L, testSourceContext.getLatestElement().getTimestamp());
            testProcessingTimeService.setCurrentTime(20L);
            Assert.assertEquals(12L, testSourceContext.getLatestWatermark().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 13L, kafkaTopicPartitionState2, 2L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 14L, kafkaTopicPartitionState2, 3L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 15L, kafkaTopicPartitionState2, 3L);
            testProcessingTimeService.setCurrentTime(30L);
            long timestamp = testSourceContext.getLatestWatermark().getTimestamp();
            Assert.assertTrue(timestamp >= 13 && timestamp <= 15);
        }

        @Test
        public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception {
            HashMap hashMap = new HashMap();
            hashMap.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
            TestSourceContext testSourceContext = new TestSourceContext();
            TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
            TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, new SerializedValue(this.testWmStrategy), testProcessingTimeService, 10L);
            KafkaTopicPartitionState kafkaTopicPartitionState = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(0);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1L, kafkaTopicPartitionState, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 2L, kafkaTopicPartitionState, 2L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 3L, kafkaTopicPartitionState, 3L);
            Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertEquals(3L, kafkaTopicPartitionState.getOffset());
            testProcessingTimeService.setCurrentTime(10L);
            Assert.assertTrue(testSourceContext.hasWatermark());
            Assert.assertEquals(3L, testSourceContext.getLatestWatermark().getTimestamp());
            testFetcher.emitRecordsWithTimestamps(AbstractFetcherWatermarksTest.access$200(), kafkaTopicPartitionState, 4L, Long.MIN_VALUE);
            Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertEquals(4L, kafkaTopicPartitionState.getOffset());
            testProcessingTimeService.setCurrentTime(20L);
            Assert.assertFalse(testSourceContext.hasWatermark());
        }

        @Test
        public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception {
            HashMap hashMap = new HashMap();
            TestSourceContext testSourceContext = new TestSourceContext();
            TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
            TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, new SerializedValue(this.testWmStrategy), testProcessingTimeService, 10L);
            testProcessingTimeService.setCurrentTime(10L);
            Assert.assertFalse(testSourceContext.hasWatermark());
            testFetcher.addDiscoveredPartitions(Collections.singletonList(new KafkaTopicPartition("test topic name", 0)));
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 100L, (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(0), 3L);
            testProcessingTimeService.setCurrentTime(20L);
            Assert.assertEquals(100L, testSourceContext.getLatestWatermark().getTimestamp());
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -539880412:
                    if (implMethodName.equals("lambda$getParams$39bf3a82$1")) {
                        z = false;
                        break;
                    }
                    break;
                case 1019199814:
                    if (implMethodName.equals("lambda$getParams$375e1ff1$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PeriodicWatermarksSuite") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                        return context -> {
                            return new PeriodicTestWatermarkGenerator();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PeriodicWatermarksSuite") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;J)J")) {
                        return (l, j) -> {
                            return l.longValue();
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PunctuatedTestExtractor.class */
    private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
        private PunctuatedTestExtractor() {
        }

        public long extractTimestamp(Long l, long j) {
            return l.longValue();
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(Long l, long j) {
            if (j % 3 == 0) {
                return new Watermark(j);
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$PunctuatedWatermarksSuite.class */
    public static class PunctuatedWatermarksSuite {
        @Test
        public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception {
            HashMap hashMap = new HashMap();
            hashMap.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
            TestSourceContext testSourceContext = new TestSourceContext();
            TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, new SerializedValue(new AssignerWithPunctuatedWatermarksAdapter.Strategy(new PunctuatedTestExtractor())), new TestProcessingTimeService(), 0L);
            KafkaTopicPartitionState kafkaTopicPartitionState = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(0);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1L, kafkaTopicPartitionState, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 2L, kafkaTopicPartitionState, 2L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 3L, kafkaTopicPartitionState, 3L);
            Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertTrue(testSourceContext.hasWatermark());
            Assert.assertEquals(3L, testSourceContext.getLatestWatermark().getTimestamp());
            Assert.assertEquals(3L, kafkaTopicPartitionState.getOffset());
            testFetcher.emitRecordsWithTimestamps(AbstractFetcherWatermarksTest.access$200(), kafkaTopicPartitionState, 4L, -1L);
            Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertFalse(testSourceContext.hasWatermark());
            Assert.assertEquals(4L, kafkaTopicPartitionState.getOffset());
        }

        @Test
        public void testPunctuatedWatermarks() throws Exception {
            HashMap hashMap = new HashMap();
            hashMap.put(new KafkaTopicPartition("test topic name", 7), -915623761774L);
            hashMap.put(new KafkaTopicPartition("test topic name", 13), -915623761774L);
            hashMap.put(new KafkaTopicPartition("test topic name", 21), -915623761774L);
            TestSourceContext testSourceContext = new TestSourceContext();
            TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, new SerializedValue(new AssignerWithPunctuatedWatermarksAdapter.Strategy(new PunctuatedTestExtractor())), new TestProcessingTimeService(), 0L);
            KafkaTopicPartitionState kafkaTopicPartitionState = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(0);
            KafkaTopicPartitionState kafkaTopicPartitionState2 = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(1);
            KafkaTopicPartitionState kafkaTopicPartitionState3 = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(2);
            AbstractFetcherWatermarksTest.emitRecords(testFetcher, Arrays.asList(1L, 2L), kafkaTopicPartitionState, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 2L, kafkaTopicPartitionState, 2L);
            AbstractFetcherWatermarksTest.emitRecords(testFetcher, Arrays.asList(2L, 3L), kafkaTopicPartitionState, 3L);
            Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertFalse(testSourceContext.hasWatermark());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 12L, kafkaTopicPartitionState2, 1L);
            Assert.assertEquals(12L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(12L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertFalse(testSourceContext.hasWatermark());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 101L, kafkaTopicPartitionState3, 1L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 102L, kafkaTopicPartitionState3, 2L);
            Assert.assertEquals(102L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(102L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertTrue(testSourceContext.hasWatermark());
            Assert.assertEquals(3L, testSourceContext.getLatestWatermark().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1003L, kafkaTopicPartitionState3, 3L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1004L, kafkaTopicPartitionState3, 4L);
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 1005L, kafkaTopicPartitionState3, 5L);
            Assert.assertEquals(1005L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(1005L, testSourceContext.getLatestElement().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 30L, kafkaTopicPartitionState, 4L);
            Assert.assertEquals(30L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
            Assert.assertEquals(30L, testSourceContext.getLatestElement().getTimestamp());
            Assert.assertTrue(testSourceContext.hasWatermark());
            Assert.assertEquals(12L, testSourceContext.getLatestWatermark().getTimestamp());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 13L, kafkaTopicPartitionState2, 2L);
            Assert.assertFalse(testSourceContext.hasWatermark());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 14L, kafkaTopicPartitionState2, 3L);
            Assert.assertFalse(testSourceContext.hasWatermark());
            AbstractFetcherWatermarksTest.emitRecord(testFetcher, 15L, kafkaTopicPartitionState2, 3L);
            Assert.assertTrue(testSourceContext.hasWatermark());
            Assert.assertEquals(15L, testSourceContext.getLatestWatermark().getTimestamp());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest$TestFetcher.class */
    private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
        TestFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j) throws Exception {
            super(sourceContext, map, serializedValue, processingTimeService, j, TestFetcher.class.getClassLoader(), new UnregisteredMetricsGroup(), false);
        }

        public void runFetchLoop() {
            throw new UnsupportedOperationException();
        }

        public void cancel() {
            throw new UnsupportedOperationException();
        }

        protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) {
            throw new UnsupportedOperationException();
        }

        protected Object createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
            return new Object();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T, KPH> void emitRecord(AbstractFetcher<T, KPH> abstractFetcher, T t, KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState, long j) {
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(t);
        abstractFetcher.emitRecordsWithTimestamps(arrayDeque, kafkaTopicPartitionState, j, Long.MIN_VALUE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T, KPH> void emitRecords(AbstractFetcher<T, KPH> abstractFetcher, List<T> list, KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState, long j) {
        abstractFetcher.emitRecordsWithTimestamps(new ArrayDeque(list), kafkaTopicPartitionState, j, Long.MIN_VALUE);
    }

    private static <T> Queue<T> emptyQueue() {
        return new ArrayDeque();
    }

    static /* synthetic */ Queue access$200() {
        return emptyQueue();
    }
}
