package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest.class */
public class ProcessOperatorTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$QueryingProcessFunction.class */
    private static class QueryingProcessFunction extends ProcessFunction<Integer, String> {
        private static final long serialVersionUID = 1;
        private final TimeDomain timeDomain;

        public QueryingProcessFunction(TimeDomain timeDomain) {
            this.timeDomain = timeDomain;
        }

        public void processElement(Integer num, ProcessFunction<Integer, String>.Context context, Collector<String> collector) throws Exception {
            if (this.timeDomain.equals(TimeDomain.EVENT_TIME)) {
                collector.collect(num + "TIME:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
            } else {
                collector.collect(num + "TIME:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
            }
        }

        public void onTimer(long j, ProcessFunction<Integer, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (ProcessFunction<Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    @Test
    public void testTimestampAndWatermarkQuerying() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator(new QueryingProcessFunction(TimeDomain.EVENT_TIME)));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(17L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 12L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(42L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6, 13L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(17L));
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:12", 12L));
        concurrentLinkedQueue.add(new Watermark(42L));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimestampAndProcessingTimeQuerying() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator(new QueryingProcessFunction(TimeDomain.PROCESSING_TIME)));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(17L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
        oneInputStreamOperatorTestHarness.setProcessingTime(42L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:null"));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
        oneInputStreamOperatorTestHarness.close();
    }
}
