package org.apache.flink.streaming.api.operators;

import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.class */
public class KeyedProcessOperatorTest extends TestLogger {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$BothTriggeringFlatMapFunction.class */
    private static class BothTriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
        private static final long serialVersionUID = 1;
        private final Integer expectedKey;

        public BothTriggeringFlatMapFunction(Integer num) {
            this.expectedKey = num;
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, String>.Context context, Collector<String> collector) throws Exception {
            TimerService timerService = context.timerService();
            timerService.registerProcessingTimeTimer(3L);
            timerService.registerEventTimeTimer(4L);
            timerService.registerProcessingTimeTimer(5L);
            timerService.registerEventTimeTimer(6L);
            timerService.deleteProcessingTimeTimer(3L);
            timerService.deleteEventTimeTimer(4L);
        }

        public void onTimer(long j, KeyedProcessFunction<Integer, Integer, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(this.expectedKey, onTimerContext.getCurrentKey());
            if (TimeDomain.EVENT_TIME.equals(onTimerContext.timeDomain())) {
                collector.collect("EVENT:1777");
            } else {
                collector.collect("PROC:1777");
            }
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$IdentityKeySelector.class */
    private static class IdentityKeySelector<T> implements KeySelector<T, T> {
        private static final long serialVersionUID = 1;

        private IdentityKeySelector() {
        }

        public T getKey(T t) throws Exception {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$NullOutputTagEmittingProcessFunction.class */
    private static class NullOutputTagEmittingProcessFunction extends KeyedProcessFunction<Integer, Integer, String> {
        private NullOutputTagEmittingProcessFunction() {
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, String>.Context context, Collector<String> collector) throws Exception {
            context.output((OutputTag) null, num);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$QueryingFlatMapFunction.class */
    private static class QueryingFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
        private static final long serialVersionUID = 1;
        private final TimeDomain expectedTimeDomain;

        public QueryingFlatMapFunction(TimeDomain timeDomain) {
            this.expectedTimeDomain = timeDomain;
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, String>.Context context, Collector<String> collector) throws Exception {
            if (this.expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                collector.collect(num + "TIME:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
            } else {
                collector.collect(num + "TIME:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
            }
        }

        public void onTimer(long j, KeyedProcessFunction<Integer, Integer, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$SideOutputProcessFunction.class */
    private static class SideOutputProcessFunction extends KeyedProcessFunction<Integer, Integer, String> {
        static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out") { // from class: org.apache.flink.streaming.api.operators.KeyedProcessOperatorTest.SideOutputProcessFunction.1
        };
        static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out") { // from class: org.apache.flink.streaming.api.operators.KeyedProcessOperatorTest.SideOutputProcessFunction.2
        };

        private SideOutputProcessFunction() {
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect("IN:" + num);
            context.output(INTEGER_OUTPUT_TAG, num);
            context.output(LONG_OUTPUT_TAG, Long.valueOf(num.longValue()));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$TriggeringFlatMapFunction.class */
    private static class TriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, Integer> {
        private static final long serialVersionUID = 1;
        private final TimeDomain expectedTimeDomain;
        private final Integer expectedKey;

        public TriggeringFlatMapFunction(TimeDomain timeDomain, Integer num) {
            this.expectedTimeDomain = timeDomain;
            this.expectedKey = num;
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
            collector.collect(num);
            if (this.expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                context.timerService().registerEventTimeTimer(context.timerService().currentWatermark() + 5);
            } else {
                context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5);
            }
        }

        public void onTimer(long j, KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext onTimerContext, Collector<Integer> collector) throws Exception {
            Assert.assertEquals(this.expectedKey, onTimerContext.getCurrentKey());
            Assert.assertEquals(this.expectedTimeDomain, onTimerContext.timeDomain());
            collector.collect(1777);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest$TriggeringStatefulFlatMapFunction.class */
    private static class TriggeringStatefulFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<Integer> state = new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
        private final TimeDomain expectedTimeDomain;

        public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
            this.expectedTimeDomain = timeDomain;
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, String>.Context context, Collector<String> collector) throws Exception {
            TimerService timerService = context.timerService();
            ValueState state = getRuntimeContext().getState(this.state);
            if (state.value() != null) {
                state.clear();
                if (this.expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                    timerService.deleteEventTimeTimer(timerService.currentWatermark() + 4);
                    return;
                } else {
                    timerService.deleteProcessingTimeTimer(timerService.currentProcessingTime() + 4);
                    return;
                }
            }
            collector.collect("INPUT:" + num);
            state.update(num);
            if (this.expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
            } else {
                timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
            }
        }

        public void onTimer(long j, KeyedProcessFunction<Integer, Integer, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(this.expectedTimeDomain, onTimerContext.timeDomain());
            collector.collect("STATE:" + getRuntimeContext().getState(this.state).value());
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    @Test
    public void testKeyQuerying() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>() { // from class: org.apache.flink.streaming.api.operators.KeyedProcessOperatorTest.1KeyQueryingProcessFunction
            public void processElement(Tuple2<Integer, String> tuple2, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context context, Collector<String> collector) throws Exception {
                Assert.assertTrue("Did not get expected key.", ((Integer) context.getCurrentKey()).equals(tuple2.f0));
                collector.collect(tuple2.f1);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<Integer, String>) obj, (KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context) context, (Collector<String>) collector);
            }
        }), tuple2 -> {
            return (Integer) tuple2.f0;
        }, (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        Throwable th = null;
        try {
            try {
                keyedOneInputStreamOperatorTestHarness.setup();
                keyedOneInputStreamOperatorTestHarness.open();
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(Tuple2.of(5, "5"), 12L));
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(Tuple2.of(42, "42"), 13L));
                ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
                concurrentLinkedQueue.add(new StreamRecord("5", 12L));
                concurrentLinkedQueue.add(new StreamRecord("42", 13L));
                TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
                if (keyedOneInputStreamOperatorTestHarness != null) {
                    if (0 == 0) {
                        keyedOneInputStreamOperatorTestHarness.close();
                        return;
                    }
                    try {
                        keyedOneInputStreamOperatorTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (keyedOneInputStreamOperatorTestHarness != null) {
                if (th != null) {
                    try {
                        keyedOneInputStreamOperatorTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    keyedOneInputStreamOperatorTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTimestampAndWatermarkQuerying() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(17L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 12L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(42L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(6, 13L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(17L));
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:12", 12L));
        concurrentLinkedQueue.add(new Watermark(42L));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimestampAndProcessingTimeQuerying() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(17L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(42L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(6));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:null"));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testEventTimeTimers() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME, 17)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17, 42L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(0L));
        concurrentLinkedQueue.add(new StreamRecord(17, 42L));
        concurrentLinkedQueue.add(new StreamRecord(1777, 5L));
        concurrentLinkedQueue.add(new Watermark(5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeTimers() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME, 17)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(5L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(17));
        concurrentLinkedQueue.add(new StreamRecord(1777));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testEventTimeTimerWithState() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(1L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17, 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(13, 0L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(2L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(42, 1L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(13, 1L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(6L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(7L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(1L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:17", 0L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:13", 0L));
        concurrentLinkedQueue.add(new Watermark(2L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:42", 1L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:17", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:42", 7L));
        concurrentLinkedQueue.add(new Watermark(7L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeTimerWithState() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(13));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(13));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(42));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(6L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(7L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("INPUT:17"));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:13"));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:42"));
        concurrentLinkedQueue.add(new StreamRecord("STATE:17"));
        concurrentLinkedQueue.add(new StreamRecord("STATE:42"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new BothTriggeringFlatMapFunction(5)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 12L));
        OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        keyedOneInputStreamOperatorTestHarness.close();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new BothTriggeringFlatMapFunction(5)), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness2.setup();
        keyedOneInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness2.open();
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(5L);
        keyedOneInputStreamOperatorTestHarness2.processWatermark(new Watermark(6L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("PROC:1777"));
        concurrentLinkedQueue.add(new StreamRecord("EVENT:1777", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness2.getOutput());
        keyedOneInputStreamOperatorTestHarness2.close();
    }

    @Test
    public void testNullOutputTagRefusal() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new NullOutputTagEmittingProcessFunction()), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(17L);
        try {
            this.expectedException.expect(IllegalArgumentException.class);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
        } finally {
            keyedOneInputStreamOperatorTestHarness.close();
        }
    }

    @Test
    public void testSideOutput() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new KeyedProcessOperator(new SideOutputProcessFunction()), (KeySelector) new IdentityKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(42, 17L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("IN:42", 17L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        concurrentLinkedQueue2.add(new StreamRecord(42, 17L));
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", concurrentLinkedQueue2, keyedOneInputStreamOperatorTestHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG));
        ConcurrentLinkedQueue concurrentLinkedQueue3 = new ConcurrentLinkedQueue();
        concurrentLinkedQueue3.add(new StreamRecord(42L, 17L));
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", concurrentLinkedQueue3, keyedOneInputStreamOperatorTestHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG));
        keyedOneInputStreamOperatorTestHarness.close();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 993308798:
                if (implMethodName.equals("lambda$testKeyQuerying$aeea360d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
