/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class AbstractFetcherTest {
    @Test
    public void testSkipCorruptedRecord() throws Exception {
        String testTopic = "test topic name";
        HashMap<KafkaTopicPartition, Long> originalPartitions = new HashMap<KafkaTopicPartition, Long>();
        originalPartitions.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
        TestSourceContext sourceContext = new TestSourceContext();
        TestFetcher fetcher = new TestFetcher(sourceContext, originalPartitions, null, null, (ProcessingTimeService)Mockito.mock(TestProcessingTimeService.class), 0L);
        KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates()[0];
        fetcher.emitRecord(1L, partitionStateHolder, 1L);
        fetcher.emitRecord(2L, partitionStateHolder, 2L);
        Assert.assertEquals((long)2L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)2L, (long)partitionStateHolder.getOffset());
        fetcher.emitRecord(null, partitionStateHolder, 3L);
        Assert.assertEquals((long)2L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)partitionStateHolder.getOffset());
    }

    @Test
    public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception {
        String testTopic = "test topic name";
        HashMap<KafkaTopicPartition, Long> originalPartitions = new HashMap<KafkaTopicPartition, Long>();
        originalPartitions.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
        TestSourceContext sourceContext = new TestSourceContext();
        TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
        TestFetcher fetcher = new TestFetcher(sourceContext, originalPartitions, null, new SerializedValue((Object)new PunctuatedTestExtractor()), (ProcessingTimeService)processingTimeProvider, 0L);
        KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates()[0];
        fetcher.emitRecord(1L, partitionStateHolder, 1L);
        fetcher.emitRecord(2L, partitionStateHolder, 2L);
        fetcher.emitRecord(3L, partitionStateHolder, 3L);
        Assert.assertEquals((long)3L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertTrue((boolean)sourceContext.hasWatermark());
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestWatermark().getTimestamp());
        Assert.assertEquals((long)3L, (long)partitionStateHolder.getOffset());
        fetcher.emitRecord(null, partitionStateHolder, 4L);
        Assert.assertEquals((long)3L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertFalse((boolean)sourceContext.hasWatermark());
        Assert.assertEquals((long)4L, (long)partitionStateHolder.getOffset());
    }

    @Test
    public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception {
        String testTopic = "test topic name";
        HashMap<KafkaTopicPartition, Long> originalPartitions = new HashMap<KafkaTopicPartition, Long>();
        originalPartitions.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
        TestSourceContext sourceContext = new TestSourceContext();
        TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
        TestFetcher fetcher = new TestFetcher(sourceContext, originalPartitions, new SerializedValue((Object)new PeriodicTestExtractor()), null, (ProcessingTimeService)processingTimeProvider, 10L);
        KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates()[0];
        fetcher.emitRecord(1L, partitionStateHolder, 1L);
        fetcher.emitRecord(2L, partitionStateHolder, 2L);
        fetcher.emitRecord(3L, partitionStateHolder, 3L);
        Assert.assertEquals((long)3L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertEquals((long)3L, (long)partitionStateHolder.getOffset());
        processingTimeProvider.setCurrentTime(10L);
        Assert.assertTrue((boolean)sourceContext.hasWatermark());
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestWatermark().getTimestamp());
        fetcher.emitRecord(null, partitionStateHolder, 4L);
        Assert.assertEquals((long)3L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertEquals((long)4L, (long)partitionStateHolder.getOffset());
        processingTimeProvider.setCurrentTime(20L);
        Assert.assertFalse((boolean)sourceContext.hasWatermark());
    }

    @Test
    public void testPunctuatedWatermarks() throws Exception {
        String testTopic = "test topic name";
        HashMap<KafkaTopicPartition, Long> originalPartitions = new HashMap<KafkaTopicPartition, Long>();
        originalPartitions.put(new KafkaTopicPartition("test topic name", 7), -915623761774L);
        originalPartitions.put(new KafkaTopicPartition("test topic name", 13), -915623761774L);
        originalPartitions.put(new KafkaTopicPartition("test topic name", 21), -915623761774L);
        TestSourceContext sourceContext = new TestSourceContext();
        TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
        TestFetcher fetcher = new TestFetcher(sourceContext, originalPartitions, null, new SerializedValue((Object)new PunctuatedTestExtractor()), (ProcessingTimeService)processingTimeProvider, 0L);
        KafkaTopicPartitionState part1 = fetcher.subscribedPartitionStates()[0];
        KafkaTopicPartitionState part2 = fetcher.subscribedPartitionStates()[1];
        KafkaTopicPartitionState part3 = fetcher.subscribedPartitionStates()[2];
        fetcher.emitRecord(1L, part1, 1L);
        fetcher.emitRecord(2L, part1, 2L);
        fetcher.emitRecord(3L, part1, 3L);
        Assert.assertEquals((long)3L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertFalse((boolean)sourceContext.hasWatermark());
        fetcher.emitRecord(12L, part2, 1L);
        Assert.assertEquals((long)12L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)12L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertFalse((boolean)sourceContext.hasWatermark());
        fetcher.emitRecord(101L, part3, 1L);
        fetcher.emitRecord(102L, part3, 2L);
        Assert.assertEquals((long)102L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)102L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertTrue((boolean)sourceContext.hasWatermark());
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestWatermark().getTimestamp());
        fetcher.emitRecord(1003L, part3, 3L);
        fetcher.emitRecord(1004L, part3, 4L);
        fetcher.emitRecord(1005L, part3, 5L);
        Assert.assertEquals((long)1005L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)1005L, (long)sourceContext.getLatestElement().getTimestamp());
        fetcher.emitRecord(30L, part1, 4L);
        Assert.assertEquals((long)30L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)30L, (long)sourceContext.getLatestElement().getTimestamp());
        Assert.assertTrue((boolean)sourceContext.hasWatermark());
        Assert.assertEquals((long)12L, (long)sourceContext.getLatestWatermark().getTimestamp());
        fetcher.emitRecord(13L, part2, 2L);
        Assert.assertFalse((boolean)sourceContext.hasWatermark());
        fetcher.emitRecord(14L, part2, 3L);
        Assert.assertFalse((boolean)sourceContext.hasWatermark());
        fetcher.emitRecord(15L, part2, 3L);
        Assert.assertTrue((boolean)sourceContext.hasWatermark());
        Assert.assertEquals((long)15L, (long)sourceContext.getLatestWatermark().getTimestamp());
    }

    @Test
    public void testPeriodicWatermarks() throws Exception {
        String testTopic = "test topic name";
        HashMap<KafkaTopicPartition, Long> originalPartitions = new HashMap<KafkaTopicPartition, Long>();
        originalPartitions.put(new KafkaTopicPartition("test topic name", 7), -915623761774L);
        originalPartitions.put(new KafkaTopicPartition("test topic name", 13), -915623761774L);
        originalPartitions.put(new KafkaTopicPartition("test topic name", 21), -915623761774L);
        TestSourceContext sourceContext = new TestSourceContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        TestFetcher fetcher = new TestFetcher(sourceContext, originalPartitions, new SerializedValue((Object)new PeriodicTestExtractor()), null, (ProcessingTimeService)processingTimeService, 10L);
        KafkaTopicPartitionState part1 = fetcher.subscribedPartitionStates()[0];
        KafkaTopicPartitionState part2 = fetcher.subscribedPartitionStates()[1];
        KafkaTopicPartitionState part3 = fetcher.subscribedPartitionStates()[2];
        fetcher.emitRecord(1L, part1, 1L);
        fetcher.emitRecord(2L, part1, 2L);
        fetcher.emitRecord(3L, part1, 3L);
        Assert.assertEquals((long)3L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestElement().getTimestamp());
        fetcher.emitRecord(12L, part2, 1L);
        Assert.assertEquals((long)12L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)12L, (long)sourceContext.getLatestElement().getTimestamp());
        fetcher.emitRecord(101L, part3, 1L);
        fetcher.emitRecord(102L, part3, 2L);
        Assert.assertEquals((long)102L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)102L, (long)sourceContext.getLatestElement().getTimestamp());
        processingTimeService.setCurrentTime(10L);
        Assert.assertEquals((long)3L, (long)sourceContext.getLatestWatermark().getTimestamp());
        fetcher.emitRecord(1003L, part3, 3L);
        fetcher.emitRecord(1004L, part3, 4L);
        fetcher.emitRecord(1005L, part3, 5L);
        Assert.assertEquals((long)1005L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)1005L, (long)sourceContext.getLatestElement().getTimestamp());
        fetcher.emitRecord(30L, part1, 4L);
        Assert.assertEquals((long)30L, (long)((Long)sourceContext.getLatestElement().getValue()));
        Assert.assertEquals((long)30L, (long)sourceContext.getLatestElement().getTimestamp());
        processingTimeService.setCurrentTime(20L);
        Assert.assertEquals((long)12L, (long)sourceContext.getLatestWatermark().getTimestamp());
        fetcher.emitRecord(13L, part2, 2L);
        fetcher.emitRecord(14L, part2, 3L);
        fetcher.emitRecord(15L, part2, 3L);
        processingTimeService.setCurrentTime(30L);
        long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
        Assert.assertTrue((watermarkTs >= 13L && watermarkTs <= 15L ? 1 : 0) != 0);
    }

    private static class PunctuatedTestExtractor
    implements AssignerWithPunctuatedWatermarks<Long> {
        private PunctuatedTestExtractor() {
        }

        public long extractTimestamp(Long element, long previousElementTimestamp) {
            return element;
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
            return extractedTimestamp % 3L == 0L ? new Watermark(extractedTimestamp) : null;
        }
    }

    private static class PeriodicTestExtractor
    implements AssignerWithPeriodicWatermarks<Long> {
        private volatile long maxTimestamp = Long.MIN_VALUE;

        private PeriodicTestExtractor() {
        }

        public long extractTimestamp(Long element, long previousElementTimestamp) {
            this.maxTimestamp = Math.max(this.maxTimestamp, element);
            return element;
        }

        @Nullable
        public Watermark getCurrentWatermark() {
            return new Watermark(this.maxTimestamp);
        }
    }

    private static final class TestSourceContext<T>
    implements SourceFunction.SourceContext<T> {
        private final Object checkpointLock = new Object();
        private final Object watermarkLock = new Object();
        private volatile StreamRecord<T> latestElement;
        private volatile Watermark currentWatermark;

        private TestSourceContext() {
        }

        public void collect(T element) {
            this.latestElement = new StreamRecord(element);
        }

        public void collectWithTimestamp(T element, long timestamp) {
            this.latestElement = new StreamRecord(element, timestamp);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void emitWatermark(Watermark mark) {
            Object object = this.watermarkLock;
            synchronized (object) {
                this.currentWatermark = mark;
                this.watermarkLock.notifyAll();
            }
        }

        public void markAsTemporarilyIdle() {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        public StreamRecord<T> getLatestElement() {
            return this.latestElement;
        }

        public boolean hasWatermark() {
            return this.currentWatermark != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Watermark getLatestWatermark() throws InterruptedException {
            Object object = this.watermarkLock;
            synchronized (object) {
                while (this.currentWatermark == null) {
                    this.watermarkLock.wait();
                }
                Watermark wm = this.currentWatermark;
                this.currentWatermark = null;
                return wm;
            }
        }
    }

    private static final class TestFetcher<T>
    extends AbstractFetcher<T, Object> {
        protected TestFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval) throws Exception {
            super(sourceContext, assignedPartitionsWithStartOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
        }

        public void runFetchLoop() throws Exception {
            throw new UnsupportedOperationException();
        }

        public void cancel() {
            throw new UnsupportedOperationException();
        }

        public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
            return new Object();
        }

        public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
            throw new UnsupportedOperationException();
        }
    }
}

