package org.apache.flink.test.streaming.runtime;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.SplittableIterator;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.class */
public class SortingBoundedInputITCase extends AbstractTestBaseJUnit4 {
    private static final WatermarkGenerator<Tuple2<Integer, Integer>> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP = new WatermarkGenerator<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.SortingBoundedInputITCase.3
        public void onEvent(Tuple2<Integer, Integer> tuple2, long j, WatermarkOutput watermarkOutput) {
            if (j == 4) {
                watermarkOutput.emitWatermark(new Watermark(5L));
            } else if (j == 14) {
                watermarkOutput.emitWatermark(new Watermark(15L));
            }
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        }
    };

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase$AssertingOperator.class */
    private static class AssertingOperator extends AbstractStreamOperator<Long> implements OneInputStreamOperator<Tuple2<Integer, byte[]>, Long>, BoundedOneInput {
        private final Set<Integer> seenKeys;
        private long seenRecords;
        private Integer currentKey;

        private AssertingOperator() {
            this.seenKeys = new HashSet();
            this.seenRecords = 0L;
            this.currentKey = null;
        }

        public void processElement(StreamRecord<Tuple2<Integer, byte[]>> streamRecord) throws Exception {
            this.seenRecords++;
            Integer num = (Integer) ((Tuple2) streamRecord.getValue()).f0;
            if (Objects.equals(num, this.currentKey)) {
                return;
            }
            if (!this.seenKeys.add(num)) {
                Assert.fail("Received an out of order key: " + num);
            }
            this.currentKey = num;
        }

        public void endInput() {
            this.output.collect(new StreamRecord(Long.valueOf(this.seenRecords)));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase$AssertingThreeInputOperator.class */
    private static class AssertingThreeInputOperator extends AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long>, BoundedMultiInput {
        private final Set<Integer> seenKeys;
        private long seenRecords;
        private Integer currentKey;
        private boolean input1Finished;
        private boolean input2Finished;
        private boolean input3Finished;
        static final /* synthetic */ boolean $assertionsDisabled;

        public AssertingThreeInputOperator(StreamOperatorParameters<Long> streamOperatorParameters, int i) {
            super(streamOperatorParameters, 3);
            this.seenKeys = new HashSet();
            this.seenRecords = 0L;
            this.currentKey = null;
            this.input1Finished = false;
            this.input2Finished = false;
            this.input3Finished = false;
            if (!$assertionsDisabled && i != 3) {
                throw new AssertionError();
            }
        }

        private void processElement(Tuple2<Integer, byte[]> tuple2) {
            this.seenRecords++;
            Integer num = (Integer) tuple2.f0;
            if (Objects.equals(num, this.currentKey)) {
                return;
            }
            if (!this.seenKeys.add(num)) {
                Assert.fail("Received an out of order key: " + num);
            }
            this.currentKey = num;
        }

        public void endInput(int i) {
            if (i == 1) {
                this.input1Finished = true;
            }
            if (i == 2) {
                this.input2Finished = true;
            }
            if (i == 3) {
                this.input3Finished = true;
            }
            if (this.input1Finished && this.input2Finished && this.input3Finished) {
                this.output.collect(new StreamRecord(Long.valueOf(this.seenRecords)));
            }
        }

        public List<Input> getInputs() {
            return Arrays.asList(new SingleInput(this::processElement), new SingleInput(this::processElement), new SingleInput(this::processElement));
        }

        static {
            $assertionsDisabled = !SortingBoundedInputITCase.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase$AssertingThreeInputOperatorFactory.class */
    private static class AssertingThreeInputOperatorFactory implements StreamOperatorFactory<Long> {
        private AssertingThreeInputOperatorFactory() {
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> streamOperatorParameters) {
            return new AssertingThreeInputOperator(streamOperatorParameters, 3);
        }

        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        }

        public ChainingStrategy getChainingStrategy() {
            return ChainingStrategy.NEVER;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return AssertingThreeInputOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase$AssertingTwoInputOperator.class */
    private static class AssertingTwoInputOperator extends AbstractStreamOperator<Long> implements TwoInputStreamOperator<Tuple2<Integer, byte[]>, Tuple2<Integer, byte[]>, Long>, BoundedMultiInput {
        private final Set<Integer> seenKeys;
        private long seenRecords;
        private Integer currentKey;
        private boolean input1Finished;
        private boolean input2Finished;

        private AssertingTwoInputOperator() {
            this.seenKeys = new HashSet();
            this.seenRecords = 0L;
            this.currentKey = null;
            this.input1Finished = false;
            this.input2Finished = false;
        }

        public void processElement1(StreamRecord<Tuple2<Integer, byte[]>> streamRecord) {
            processElement(streamRecord);
        }

        public void processElement2(StreamRecord<Tuple2<Integer, byte[]>> streamRecord) {
            processElement(streamRecord);
        }

        private void processElement(StreamRecord<Tuple2<Integer, byte[]>> streamRecord) {
            this.seenRecords++;
            Integer num = (Integer) ((Tuple2) streamRecord.getValue()).f0;
            if (Objects.equals(num, this.currentKey)) {
                return;
            }
            if (!this.seenKeys.add(num)) {
                Assert.fail("Received an out of order key: " + num);
            }
            this.currentKey = num;
        }

        public void endInput(int i) {
            if (i == 1) {
                this.input1Finished = true;
            }
            if (i == 2) {
                this.input2Finished = true;
            }
            if (this.input1Finished && this.input2Finished) {
                this.output.collect(new StreamRecord(Long.valueOf(this.seenRecords)));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase$InputGenerator.class */
    private static class InputGenerator extends SplittableIterator<Tuple2<Integer, byte[]>> {
        private final long numberOfRecords;
        private long generatedRecords;
        private final Random rnd;
        private final byte[] bytes;

        private InputGenerator(long j) {
            this.rnd = new Random();
            this.bytes = new byte[500];
            this.numberOfRecords = j;
            this.rnd.nextBytes(this.bytes);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Iterator<Tuple2<Integer, byte[]>>[] split(int i) {
            long j = this.numberOfRecords / i;
            long j2 = this.numberOfRecords % i;
            Iterator<Tuple2<Integer, byte[]>>[] itArr = new Iterator[i];
            for (int i2 = 0; i2 < i - 1; i2++) {
                itArr[i2] = new InputGenerator(j);
            }
            itArr[i - 1] = new InputGenerator(j + j2);
            return itArr;
        }

        public int getMaximumNumberOfSplits() {
            return (int) Math.min(this.numberOfRecords, 2147483647L);
        }

        public boolean hasNext() {
            return this.generatedRecords < this.numberOfRecords;
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Tuple2<Integer, byte[]> m996next() {
            if (!hasNext()) {
                return null;
            }
            this.generatedRecords++;
            return Tuple2.of(Integer.valueOf(this.rnd.nextInt(10)), this.bytes);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase$SingleInput.class */
    private static class SingleInput implements Input<Tuple2<Integer, byte[]>> {
        private final Consumer<Tuple2<Integer, byte[]>> recordConsumer;

        private SingleInput(Consumer<Tuple2<Integer, byte[]>> consumer) {
            this.recordConsumer = consumer;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement(StreamRecord<Tuple2<Integer, byte[]>> streamRecord) throws Exception {
            this.recordConsumer.accept(streamRecord.getValue());
        }

        public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark) {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void setKeyContextElement(StreamRecord<Tuple2<Integer, byte[]>> streamRecord) {
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        }
    }

    @Test
    public void testOneInputOperator() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        Assert.assertThat(Long.valueOf(CollectionUtil.iteratorToList(DataStreamUtils.collect(executionEnvironment.fromParallelCollection(new InputGenerator(1000000L), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        }).transform("Asserting operator", BasicTypeInfo.LONG_TYPE_INFO, new AssertingOperator()))).stream().mapToLong(l -> {
            return l.longValue();
        }).sum()), CoreMatchers.equalTo(1000000L));
    }

    @Test
    public void testTwoInputOperator() {
        long j = 500000;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        Assert.assertThat(Long.valueOf(CollectionUtil.iteratorToList(DataStreamUtils.collect(executionEnvironment.fromParallelCollection(new InputGenerator(j), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).connect(executionEnvironment.fromParallelCollection(new InputGenerator(j), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}))).keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        }, tuple22 -> {
            return (Integer) tuple22.f0;
        }).transform("Asserting operator", BasicTypeInfo.LONG_TYPE_INFO, new AssertingTwoInputOperator()))).stream().mapToLong(l -> {
            return l.longValue();
        }).sum()), CoreMatchers.equalTo(Long.valueOf(500000 * 2)));
    }

    @Test
    public void testThreeInputOperator() {
        long j = 500000;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        KeyedStream keyBy = executionEnvironment.fromParallelCollection(new InputGenerator(j), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        });
        KeyedStream keyBy2 = executionEnvironment.fromParallelCollection(new InputGenerator(j), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy(tuple22 -> {
            return (Integer) tuple22.f0;
        });
        KeyedStream keyBy3 = executionEnvironment.fromParallelCollection(new InputGenerator(j), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy(tuple23 -> {
            return (Integer) tuple23.f0;
        });
        KeyedMultipleInputTransformation keyedMultipleInputTransformation = new KeyedMultipleInputTransformation("Asserting operator", new AssertingThreeInputOperatorFactory(), BasicTypeInfo.LONG_TYPE_INFO, -1, BasicTypeInfo.INT_TYPE_INFO);
        keyedMultipleInputTransformation.addInput(keyBy.getTransformation(), keyBy.getKeySelector());
        keyedMultipleInputTransformation.addInput(keyBy2.getTransformation(), keyBy2.getKeySelector());
        keyedMultipleInputTransformation.addInput(keyBy3.getTransformation(), keyBy3.getKeySelector());
        executionEnvironment.addOperator(keyedMultipleInputTransformation);
        Assert.assertThat(Long.valueOf(CollectionUtil.iteratorToList(DataStreamUtils.collect(new DataStream(executionEnvironment, keyedMultipleInputTransformation))).stream().mapToLong(l -> {
            return l.longValue();
        }).sum()), CoreMatchers.equalTo(Long.valueOf(500000 * 3)));
    }

    @Test
    public void testBatchExecutionWithTimersOneInput() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        SingleOutputStreamOperator assignTimestampsAndWatermarks = executionEnvironment.fromData(new Tuple2[]{Tuple2.of(1, 3), Tuple2.of(1, 1), Tuple2.of(2, 1), Tuple2.of(1, 4), Tuple2.of(2, 3), Tuple2.of(1, 2), Tuple2.of(1, 13), Tuple2.of(1, 11), Tuple2.of(2, 14), Tuple2.of(1, 11)}).assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(context -> {
            return GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP;
        }).withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        }));
        final OutputTag outputTag = new OutputTag("late_elements", BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator process = assignTimestampsAndWatermarks.map(tuple22 -> {
            return (Integer) tuple22.f0;
        }).keyBy(num -> {
            return num;
        }).process(new KeyedProcessFunction<Integer, Integer, Tuple3<Long, Integer, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.SortingBoundedInputITCase.1
            private MapState<Long, Integer> countState;
            private ValueState<Long> previousTimestampState;

            public void open(OpenContext openContext) {
                this.countState = getRuntimeContext().getMapState(new MapStateDescriptor("sum", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
                this.previousTimestampState = getRuntimeContext().getState(new ValueStateDescriptor("previousTimestamp", BasicTypeInfo.LONG_TYPE_INFO));
            }

            public void processElement(Integer num2, KeyedProcessFunction<Integer, Integer, Tuple3<Long, Integer, Integer>>.Context context2, Collector<Tuple3<Long, Integer, Integer>> collector) throws Exception {
                Long timestamp = context2.timestamp();
                long longValue = ((timestamp.longValue() + 10) / 10) * 10;
                context2.timerService().registerEventTimeTimer(longValue);
                if (timestamp.longValue() < context2.timerService().currentWatermark()) {
                    context2.output(outputTag, num2);
                    return;
                }
                Assert.assertThat(timestamp, Matchers.greaterThanOrEqualTo((Long) Optional.ofNullable(this.previousTimestampState.value()).orElse(0L)));
                this.previousTimestampState.update(timestamp);
                this.countState.put(Long.valueOf(longValue), Integer.valueOf(((Integer) Optional.ofNullable(this.countState.get(Long.valueOf(longValue))).orElse(0)).intValue() + 1));
            }

            public void onTimer(long j2, KeyedProcessFunction<Integer, Integer, Tuple3<Long, Integer, Integer>>.OnTimerContext onTimerContext, Collector<Tuple3<Long, Integer, Integer>> collector) throws Exception {
                collector.collect(Tuple3.of(Long.valueOf(j2), onTimerContext.getCurrentKey(), this.countState.get(Long.valueOf(j2))));
                this.countState.remove(Long.valueOf(j2));
                onTimerContext.timerService().registerEventTimeTimer(j2 + 1);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context2, Collector collector) throws Exception {
                processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Tuple3<Long, Integer, Integer>>.Context) context2, (Collector<Tuple3<Long, Integer, Integer>>) collector);
            }
        });
        List iteratorToList = CollectionUtil.iteratorToList(DataStreamUtils.collect(process.getSideOutput(outputTag)));
        List iteratorToList2 = CollectionUtil.iteratorToList(DataStreamUtils.collect(process));
        Assert.assertTrue(iteratorToList.isEmpty());
        Assert.assertThat(iteratorToList2, CoreMatchers.equalTo(Arrays.asList(Tuple3.of(10L, 1, 4), Tuple3.of(20L, 1, 3), Tuple3.of(10L, 2, 2), Tuple3.of(20L, 2, 1))));
    }

    @Test
    public void testBatchExecutionWithTimersTwoInput() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        WatermarkStrategy withTimestampAssigner = WatermarkStrategy.forGenerator(context -> {
            return GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP;
        }).withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        });
        SingleOutputStreamOperator map = executionEnvironment.fromData(new Tuple2[]{Tuple2.of(1, 3), Tuple2.of(1, 1), Tuple2.of(2, 1), Tuple2.of(1, 4), Tuple2.of(2, 3), Tuple2.of(1, 2), Tuple2.of(1, 13), Tuple2.of(1, 11), Tuple2.of(2, 14), Tuple2.of(1, 11)}).assignTimestampsAndWatermarks(withTimestampAssigner).map(tuple22 -> {
            return (Integer) tuple22.f0;
        });
        SingleOutputStreamOperator map2 = executionEnvironment.fromData(new Tuple2[]{Tuple2.of(1, 3), Tuple2.of(1, 1), Tuple2.of(2, 1), Tuple2.of(1, 4), Tuple2.of(2, 3), Tuple2.of(1, 2), Tuple2.of(1, 13), Tuple2.of(1, 11), Tuple2.of(2, 14), Tuple2.of(1, 11)}).assignTimestampsAndWatermarks(withTimestampAssigner).map(tuple23 -> {
            return (Integer) tuple23.f0;
        });
        final OutputTag outputTag = new OutputTag("late_elements", BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator process = map.connect(map2).keyBy(num -> {
            return num;
        }, num2 -> {
            return num2;
        }).process(new KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.SortingBoundedInputITCase.2
            private MapState<Long, Integer> countState;
            private ValueState<Long> previousTimestampState;

            public void open(OpenContext openContext) {
                this.countState = getRuntimeContext().getMapState(new MapStateDescriptor("sum", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
                this.previousTimestampState = getRuntimeContext().getState(new ValueStateDescriptor("previousTimestamp", BasicTypeInfo.LONG_TYPE_INFO));
            }

            public void processElement1(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>.Context context2, Collector<Tuple3<Long, Integer, Integer>> collector) throws Exception {
                processElement(num3, context2);
            }

            public void processElement2(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>.Context context2, Collector<Tuple3<Long, Integer, Integer>> collector) throws Exception {
                processElement(num3, context2);
            }

            private void processElement(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>.Context context2) throws Exception {
                Long timestamp = context2.timestamp();
                long longValue = ((timestamp.longValue() + 10) / 10) * 10;
                context2.timerService().registerEventTimeTimer(longValue);
                if (timestamp.longValue() < context2.timerService().currentWatermark()) {
                    context2.output(outputTag, num3);
                    return;
                }
                Assert.assertThat(timestamp, Matchers.greaterThanOrEqualTo((Long) Optional.ofNullable(this.previousTimestampState.value()).orElse(0L)));
                this.previousTimestampState.update(timestamp);
                this.countState.put(Long.valueOf(longValue), Integer.valueOf(((Integer) Optional.ofNullable(this.countState.get(Long.valueOf(longValue))).orElse(0)).intValue() + 1));
            }

            public void onTimer(long j2, KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>.OnTimerContext onTimerContext, Collector<Tuple3<Long, Integer, Integer>> collector) throws Exception {
                collector.collect(Tuple3.of(Long.valueOf(j2), onTimerContext.getCurrentKey(), this.countState.get(Long.valueOf(j2))));
                this.countState.remove(Long.valueOf(j2));
                onTimerContext.timerService().registerEventTimeTimer(j2 + 1);
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context2, Collector collector) throws Exception {
                processElement2((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>.Context) context2, (Collector<Tuple3<Long, Integer, Integer>>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context2, Collector collector) throws Exception {
                processElement1((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>.Context) context2, (Collector<Tuple3<Long, Integer, Integer>>) collector);
            }
        });
        List iteratorToList = CollectionUtil.iteratorToList(DataStreamUtils.collect(process.getSideOutput(outputTag)));
        List iteratorToList2 = CollectionUtil.iteratorToList(DataStreamUtils.collect(process));
        Assert.assertTrue(iteratorToList.isEmpty());
        Assert.assertThat(iteratorToList2, CoreMatchers.equalTo(Arrays.asList(Tuple3.of(10L, 1, 8), Tuple3.of(20L, 1, 6), Tuple3.of(10L, 2, 4), Tuple3.of(20L, 2, 2))));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2007862564:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersTwoInput$84f51f8b$1")) {
                    z = 11;
                    break;
                }
                break;
            case -436079930:
                if (implMethodName.equals("lambda$testOneInputOperator$8ef3bd2b$1")) {
                    z = true;
                    break;
                }
                break;
            case 48137368:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersOneInput$400354f4$1")) {
                    z = false;
                    break;
                }
                break;
            case 212706565:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersOneInput$ec2c70e6$1")) {
                    z = 3;
                    break;
                }
                break;
            case 385223806:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersTwoInput$400354f4$1")) {
                    z = 5;
                    break;
                }
                break;
            case 851662929:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersTwoInput$b23fb0d2$1")) {
                    z = 13;
                    break;
                }
                break;
            case 851721550:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersTwoInput$b23fb0f1$1")) {
                    z = 8;
                    break;
                }
                break;
            case 1058944172:
                if (implMethodName.equals("lambda$testTwoInputOperator$8ef3bd2b$1")) {
                    z = 12;
                    break;
                }
                break;
            case 1058944173:
                if (implMethodName.equals("lambda$testTwoInputOperator$8ef3bd2b$2")) {
                    z = 14;
                    break;
                }
                break;
            case 1586409577:
                if (implMethodName.equals("lambda$testThreeInputOperator$dd32ca71$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1586468198:
                if (implMethodName.equals("lambda$testThreeInputOperator$dd32ca90$1")) {
                    z = 6;
                    break;
                }
                break;
            case 1587711732:
                if (implMethodName.equals("lambda$testThreeInputOperator$dd32caaf$1")) {
                    z = 15;
                    break;
                }
                break;
            case 1776539179:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersOneInput$93498fe7$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1950018294:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersOneInput$84f51f8b$1")) {
                    z = 10;
                    break;
                }
                break;
            case 2113625617:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersTwoInput$93498fe7$1")) {
                    z = 9;
                    break;
                }
                break;
            case 2113625618:
                if (implMethodName.equals("lambda$testBatchExecutionWithTimersTwoInput$93498fe7$2")) {
                    z = 7;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Object;")) {
                    return tuple22 -> {
                        return (Integer) tuple22.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple222 -> {
                        return (Integer) tuple222.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context2 -> {
                        return GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Object;")) {
                    return tuple223 -> {
                        return (Integer) tuple223.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple23 -> {
                        return (Integer) tuple23.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num3 -> {
                        return num3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple24, j) -> {
                        return ((Integer) tuple24.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple25, j2) -> {
                        return ((Integer) tuple25.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple26 -> {
                        return (Integer) tuple26.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple224 -> {
                        return (Integer) tuple224.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple225 -> {
                        return (Integer) tuple225.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Object;")) {
                    return tuple232 -> {
                        return (Integer) tuple232.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
