package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.class */
public class TimestampsAndPunctuatedWatermarksOperatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest$NeverWatermarkExtractor.class */
    private static class NeverWatermarkExtractor implements AssignerWithPunctuatedWatermarks<Long> {
        private static final long serialVersionUID = 1;

        private NeverWatermarkExtractor() {
        }

        public long extractTimestamp(Long l, long j) {
            return l.longValue();
        }

        public Watermark checkAndGetNextWatermark(Long l, long j) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest$PunctuatedExtractor.class */
    private static class PunctuatedExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Boolean>> {
        private static final long serialVersionUID = 1;

        private PunctuatedExtractor() {
        }

        public long extractTimestamp(Tuple2<Long, Boolean> tuple2, long j) {
            return ((Long) tuple2.f0).longValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple2<Long, Boolean> tuple2, long j) {
            if (((Boolean) tuple2.f1).booleanValue()) {
                return new Watermark(j);
            }
            return null;
        }
    }

    @Test
    public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new TimestampsAndPunctuatedWatermarksOperator(new PunctuatedExtractor()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(3L, true), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(5L, false), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(4L, false), 0L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(10L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(4L, false), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(4L, true), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(9L, false), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(5L, false), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(7L, true), 0L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(10L, false), 0L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        ConcurrentLinkedQueue<Object> output = oneInputStreamOperatorTestHarness.getOutput();
        Assert.assertEquals(3L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(3L, ((Watermark) output.poll()).getTimestamp());
        Assert.assertEquals(5L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(4L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(4L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(4L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(4L, ((Watermark) output.poll()).getTimestamp());
        Assert.assertEquals(9L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(5L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(7L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(7L, ((Watermark) output.poll()).getTimestamp());
        Assert.assertEquals(10L, ((StreamRecord) output.poll()).getTimestamp());
        Assert.assertEquals(Long.MAX_VALUE, ((Watermark) output.poll()).getTimestamp());
    }

    @Test
    public void testZeroOnNegativeTimestamps() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new TimestampsAndPunctuatedWatermarksOperator(new NeverWatermarkExtractor()));
        oneInputStreamOperatorTestHarness.open();
        long[] jArr = {Long.MIN_VALUE, -1, 0, 1, 2, 3, Long.MAX_VALUE};
        for (long j : jArr) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(Long.valueOf(j)));
        }
        ConcurrentLinkedQueue<Object> output = oneInputStreamOperatorTestHarness.getOutput();
        for (long j2 : jArr) {
            Assert.assertEquals(j2, ((StreamRecord) output.poll()).getTimestamp());
        }
    }
}
