package org.apache.flink.streaming.api.operators.co;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.class */
public class CoProcessOperatorTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest$ProcessingTimeQueryingProcessFunction.class */
    private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
        private static final long serialVersionUID = 1;

        private ProcessingTimeQueryingProcessFunction() {
        }

        public void processElement1(Integer num, CoProcessFunction<Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(num + "PT:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
        }

        public void processElement2(String str, CoProcessFunction<Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(str + "PT:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
        }

        public void onTimer(long j, CoProcessFunction<Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (CoProcessFunction<Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (CoProcessFunction<Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest$WatermarkQueryingProcessFunction.class */
    private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
        private static final long serialVersionUID = 1;

        private WatermarkQueryingProcessFunction() {
        }

        public void processElement1(Integer num, CoProcessFunction<Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(num + "WM:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
        }

        public void processElement2(String str, CoProcessFunction<Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(str + "WM:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
        }

        public void onTimer(long j, CoProcessFunction<Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (CoProcessFunction<Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (CoProcessFunction<Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    @Test
    public void testTimestampAndWatermarkQuerying() throws Exception {
        TwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator(new WatermarkQueryingProcessFunction()));
        twoInputStreamOperatorTestHarness.setup();
        twoInputStreamOperatorTestHarness.open();
        twoInputStreamOperatorTestHarness.processWatermark1(new Watermark(17L));
        twoInputStreamOperatorTestHarness.processWatermark2(new Watermark(17L));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(5, 12L));
        twoInputStreamOperatorTestHarness.processWatermark1(new Watermark(42L));
        twoInputStreamOperatorTestHarness.processWatermark2(new Watermark(42L));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord("6", 13L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(17L));
        concurrentLinkedQueue.add(new StreamRecord("5WM:17 TS:12", 12L));
        concurrentLinkedQueue.add(new Watermark(42L));
        concurrentLinkedQueue.add(new StreamRecord("6WM:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamOperatorTestHarness.getOutput());
        twoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimestampAndProcessingTimeQuerying() throws Exception {
        TwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator(new ProcessingTimeQueryingProcessFunction()));
        twoInputStreamOperatorTestHarness.setup();
        twoInputStreamOperatorTestHarness.open();
        twoInputStreamOperatorTestHarness.setProcessingTime(17L);
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(5));
        twoInputStreamOperatorTestHarness.setProcessingTime(42L);
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord("6"));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5PT:17 TS:null"));
        concurrentLinkedQueue.add(new StreamRecord("6PT:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamOperatorTestHarness.getOutput());
        twoInputStreamOperatorTestHarness.close();
    }
}
