package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.class */
public class AbstractFetcherTimestampsTest {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest$PeriodicTestExtractor.class */
    private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
        private volatile long maxTimestamp;

        private PeriodicTestExtractor() {
            this.maxTimestamp = Long.MIN_VALUE;
        }

        public long extractTimestamp(Long l, long j) {
            this.maxTimestamp = Math.max(this.maxTimestamp, l.longValue());
            return l.longValue();
        }

        @Nullable
        public Watermark getCurrentWatermark() {
            return new Watermark(this.maxTimestamp);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest$PunctuatedTestExtractor.class */
    private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
        private PunctuatedTestExtractor() {
        }

        public long extractTimestamp(Long l, long j) {
            return l.longValue();
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(Long l, long j) {
            if (j % 3 == 0) {
                return new Watermark(j);
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest$TestFetcher.class */
    private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
        protected TestFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext) throws Exception {
            super(sourceContext, list, serializedValue, serializedValue2, streamingRuntimeContext, false);
        }

        public void runFetchLoop() throws Exception {
            throw new UnsupportedOperationException();
        }

        public void cancel() {
            throw new UnsupportedOperationException();
        }

        public Object createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
            return new Object();
        }

        public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> map) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest$TestSourceContext.class */
    private static final class TestSourceContext<T> implements SourceFunction.SourceContext<T> {
        private final Object checkpointLock;
        private final Object watermarkLock;
        private volatile StreamRecord<T> latestElement;
        private volatile Watermark currentWatermark;

        private TestSourceContext() {
            this.checkpointLock = new Object();
            this.watermarkLock = new Object();
        }

        public void collect(T t) {
            throw new UnsupportedOperationException();
        }

        public void collectWithTimestamp(T t, long j) {
            this.latestElement = new StreamRecord<>(t, j);
        }

        public void emitWatermark(Watermark watermark) {
            synchronized (this.watermarkLock) {
                this.currentWatermark = watermark;
                this.watermarkLock.notifyAll();
            }
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        public StreamRecord<T> getLatestElement() {
            return this.latestElement;
        }

        public boolean hasWatermark() {
            return this.currentWatermark != null;
        }

        public Watermark getLatestWatermark() throws InterruptedException {
            Watermark watermark;
            synchronized (this.watermarkLock) {
                while (this.currentWatermark == null) {
                    this.watermarkLock.wait();
                }
                watermark = this.currentWatermark;
                this.currentWatermark = null;
            }
            return watermark;
        }
    }

    @Test
    public void testPunctuatedWatermarks() throws Exception {
        List asList = Arrays.asList(new KafkaTopicPartition("test topic name", 7), new KafkaTopicPartition("test topic name", 13), new KafkaTopicPartition("test topic name", 21));
        TestSourceContext testSourceContext = new TestSourceContext();
        TestFetcher testFetcher = new TestFetcher(testSourceContext, asList, null, new SerializedValue(new PunctuatedTestExtractor()), new MockRuntimeContext(17, 3));
        KafkaTopicPartitionState kafkaTopicPartitionState = testFetcher.subscribedPartitions()[0];
        KafkaTopicPartitionState kafkaTopicPartitionState2 = testFetcher.subscribedPartitions()[1];
        KafkaTopicPartitionState kafkaTopicPartitionState3 = testFetcher.subscribedPartitions()[2];
        testFetcher.emitRecord(1L, kafkaTopicPartitionState, 1L);
        testFetcher.emitRecord(2L, kafkaTopicPartitionState, 2L);
        testFetcher.emitRecord(3L, kafkaTopicPartitionState, 3L);
        Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
        Assert.assertFalse(testSourceContext.hasWatermark());
        testFetcher.emitRecord(12L, kafkaTopicPartitionState2, 1L);
        Assert.assertEquals(12L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(12L, testSourceContext.getLatestElement().getTimestamp());
        Assert.assertFalse(testSourceContext.hasWatermark());
        testFetcher.emitRecord(101L, kafkaTopicPartitionState3, 1L);
        testFetcher.emitRecord(102L, kafkaTopicPartitionState3, 2L);
        Assert.assertEquals(102L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(102L, testSourceContext.getLatestElement().getTimestamp());
        Assert.assertTrue(testSourceContext.hasWatermark());
        Assert.assertEquals(3L, testSourceContext.getLatestWatermark().getTimestamp());
        testFetcher.emitRecord(1003L, kafkaTopicPartitionState3, 3L);
        testFetcher.emitRecord(1004L, kafkaTopicPartitionState3, 4L);
        testFetcher.emitRecord(1005L, kafkaTopicPartitionState3, 5L);
        Assert.assertEquals(1005L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(1005L, testSourceContext.getLatestElement().getTimestamp());
        testFetcher.emitRecord(30L, kafkaTopicPartitionState, 4L);
        Assert.assertEquals(30L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(30L, testSourceContext.getLatestElement().getTimestamp());
        Assert.assertTrue(testSourceContext.hasWatermark());
        Assert.assertEquals(12L, testSourceContext.getLatestWatermark().getTimestamp());
        testFetcher.emitRecord(13L, kafkaTopicPartitionState2, 2L);
        Assert.assertFalse(testSourceContext.hasWatermark());
        testFetcher.emitRecord(14L, kafkaTopicPartitionState2, 3L);
        Assert.assertFalse(testSourceContext.hasWatermark());
        testFetcher.emitRecord(15L, kafkaTopicPartitionState2, 3L);
        Assert.assertTrue(testSourceContext.hasWatermark());
        Assert.assertEquals(15L, testSourceContext.getLatestWatermark().getTimestamp());
    }

    @Test
    public void testPeriodicWatermarks() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(10L);
        List asList = Arrays.asList(new KafkaTopicPartition("test topic name", 7), new KafkaTopicPartition("test topic name", 13), new KafkaTopicPartition("test topic name", 21));
        TestSourceContext testSourceContext = new TestSourceContext();
        TestFetcher testFetcher = new TestFetcher(testSourceContext, asList, new SerializedValue(new PeriodicTestExtractor()), null, new MockRuntimeContext(17, 3, executionConfig, testSourceContext.getCheckpointLock()));
        KafkaTopicPartitionState kafkaTopicPartitionState = testFetcher.subscribedPartitions()[0];
        KafkaTopicPartitionState kafkaTopicPartitionState2 = testFetcher.subscribedPartitions()[1];
        KafkaTopicPartitionState kafkaTopicPartitionState3 = testFetcher.subscribedPartitions()[2];
        testFetcher.emitRecord(1L, kafkaTopicPartitionState, 1L);
        testFetcher.emitRecord(2L, kafkaTopicPartitionState, 2L);
        testFetcher.emitRecord(3L, kafkaTopicPartitionState, 3L);
        Assert.assertEquals(3L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(3L, testSourceContext.getLatestElement().getTimestamp());
        testFetcher.emitRecord(12L, kafkaTopicPartitionState2, 1L);
        Assert.assertEquals(12L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(12L, testSourceContext.getLatestElement().getTimestamp());
        testFetcher.emitRecord(101L, kafkaTopicPartitionState3, 1L);
        testFetcher.emitRecord(102L, kafkaTopicPartitionState3, 2L);
        Assert.assertEquals(102L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(102L, testSourceContext.getLatestElement().getTimestamp());
        Assert.assertEquals(3L, testSourceContext.getLatestWatermark().getTimestamp());
        testFetcher.emitRecord(1003L, kafkaTopicPartitionState3, 3L);
        testFetcher.emitRecord(1004L, kafkaTopicPartitionState3, 4L);
        testFetcher.emitRecord(1005L, kafkaTopicPartitionState3, 5L);
        Assert.assertEquals(1005L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(1005L, testSourceContext.getLatestElement().getTimestamp());
        testFetcher.emitRecord(30L, kafkaTopicPartitionState, 4L);
        Assert.assertEquals(30L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(30L, testSourceContext.getLatestElement().getTimestamp());
        Assert.assertEquals(12L, testSourceContext.getLatestWatermark().getTimestamp());
        testFetcher.emitRecord(13L, kafkaTopicPartitionState2, 2L);
        testFetcher.emitRecord(14L, kafkaTopicPartitionState2, 3L);
        testFetcher.emitRecord(15L, kafkaTopicPartitionState2, 3L);
        long timestamp = testSourceContext.getLatestWatermark().getTimestamp();
        Assert.assertTrue(timestamp >= 13 && timestamp <= 15);
    }
}
