package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.class */
public class TimestampsAndPeriodicWatermarksOperatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest$LongExtractor.class */
    private static class LongExtractor implements AssignerWithPeriodicWatermarks<Long> {
        private static final long serialVersionUID = 1;
        private long currentTimestamp;

        private LongExtractor() {
            this.currentTimestamp = Long.MIN_VALUE;
        }

        public long extractTimestamp(Long l, long j) {
            this.currentTimestamp = l.longValue();
            return l.longValue();
        }

        public Watermark getCurrentWatermark() {
            return new Watermark(this.currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : this.currentTimestamp - serialVersionUID);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest$NeverWatermarkExtractor.class */
    private static class NeverWatermarkExtractor implements AssignerWithPeriodicWatermarks<Long> {
        private static final long serialVersionUID = 1;

        private NeverWatermarkExtractor() {
        }

        public long extractTimestamp(Long l, long j) {
            return l.longValue();
        }

        public Watermark getCurrentWatermark() {
            return null;
        }
    }

    @Test
    public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
        TimestampsAndPeriodicWatermarksOperator timestampsAndPeriodicWatermarksOperator = new TimestampsAndPeriodicWatermarksOperator(new LongExtractor());
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(50L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(timestampsAndPeriodicWatermarksOperator, executionConfig);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(1L, 1L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(2L, 1L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(3L, 3L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(4L, 3L));
        ConcurrentLinkedQueue<Object> output = oneInputStreamOperatorTestHarness.getOutput();
        long j = 1;
        long j2 = -1;
        while (j2 < 3) {
            if (output.size() > 0) {
                Object poll = output.poll();
                Assert.assertNotNull(poll);
                Tuple2<Long, Long> validateElement = validateElement(poll, j, j2);
                j = ((Long) validateElement.f0).longValue();
                j2 = ((Long) validateElement.f1).longValue();
                Assert.assertTrue(j2 < j);
            } else {
                Thread.sleep(10L);
            }
        }
        output.clear();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(4L, 4L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5L, 4L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6L, 4L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(7L, 4L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(8L, 4L));
        ConcurrentLinkedQueue<Object> output2 = oneInputStreamOperatorTestHarness.getOutput();
        long j3 = 4;
        long j4 = 2;
        while (j4 < 7) {
            if (output2.size() > 0) {
                Object poll2 = output2.poll();
                Assert.assertNotNull(poll2);
                Tuple2<Long, Long> validateElement2 = validateElement(poll2, j3, j4);
                j3 = ((Long) validateElement2.f0).longValue();
                j4 = ((Long) validateElement2.f1).longValue();
                Assert.assertTrue(j4 < j3);
            } else {
                Thread.sleep(10L);
            }
        }
        output2.clear();
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        Assert.assertEquals(Long.MAX_VALUE, ((Watermark) oneInputStreamOperatorTestHarness.getOutput().poll()).getTimestamp());
    }

    @Test
    public void testNegativeTimestamps() throws Exception {
        TimestampsAndPeriodicWatermarksOperator timestampsAndPeriodicWatermarksOperator = new TimestampsAndPeriodicWatermarksOperator(new NeverWatermarkExtractor());
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(50L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(timestampsAndPeriodicWatermarksOperator, executionConfig);
        oneInputStreamOperatorTestHarness.open();
        long[] jArr = {Long.MIN_VALUE, -1, 0, 1, 2, 3, Long.MAX_VALUE};
        for (long j : jArr) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(Long.valueOf(j)));
        }
        ConcurrentLinkedQueue<Object> output = oneInputStreamOperatorTestHarness.getOutput();
        for (long j2 : jArr) {
            Assert.assertEquals(j2, ((StreamRecord) output.poll()).getTimestamp());
        }
    }

    private Tuple2<Long, Long> validateElement(Object obj, long j, long j2) {
        if (obj instanceof StreamRecord) {
            StreamRecord streamRecord = (StreamRecord) obj;
            Assert.assertEquals(j, ((Long) streamRecord.getValue()).longValue());
            Assert.assertEquals(j, streamRecord.getTimestamp());
            return new Tuple2<>(Long.valueOf(j + 1), Long.valueOf(j2));
        }
        if (!(obj instanceof Watermark)) {
            throw new IllegalArgumentException("unrecognized element: " + obj);
        }
        long timestamp = ((Watermark) obj).getTimestamp();
        Assert.assertTrue(timestamp > j2);
        return new Tuple2<>(Long.valueOf(j), Long.valueOf(timestamp));
    }
}
