package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/SideOutputITCase.class */
public class SideOutputITCase extends AbstractTestBase implements Serializable {

    @Rule
    public transient ExpectedException expectedException = ExpectedException.none();
    static List<Integer> elements = new ArrayList();

    /* renamed from: org.apache.flink.test.streaming.runtime.SideOutputITCase$1WatermarkReifier, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SideOutputITCase$1WatermarkReifier.class */
    class C1WatermarkReifier extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1;

        C1WatermarkReifier() {
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.output.collect(new StreamRecord("E:" + ((String) streamRecord.getValue())));
        }

        public void processWatermark(Watermark watermark) throws Exception {
            super.processWatermark(watermark);
            this.output.collect(new StreamRecord("WM:" + watermark.getTimestamp()));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SideOutputITCase$TestKeySelector.class */
    private static class TestKeySelector implements KeySelector<Integer, Integer> {
        private static final long serialVersionUID = 1;

        private TestKeySelector() {
        }

        public Integer getKey(Integer num) throws Exception {
            return num;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SideOutputITCase$TestWatermarkAssigner.class */
    private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Integer> {
        private static final long serialVersionUID = 1;

        private TestWatermarkAssigner() {
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(Integer num, long j) {
            return new Watermark(j);
        }

        public long extractTimestamp(Integer num, long j) {
            return Long.valueOf(num.intValue()).longValue();
        }
    }

    @Test
    public void testWatermarkForwarding() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.1
        };
        OutputTag<String> outputTag2 = new OutputTag<String>("other-side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.2
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.addSource(new SourceFunction<Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.3
            private static final long serialVersionUID = 1;

            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
                sourceContext.collectWithTimestamp(1, 0L);
                sourceContext.emitWatermark(new Watermark(0L));
                sourceContext.collectWithTimestamp(2, 1L);
                sourceContext.collectWithTimestamp(5, 2L);
                sourceContext.emitWatermark(new Watermark(2L));
                sourceContext.collectWithTimestamp(3, 3L);
                sourceContext.collectWithTimestamp(4, 4L);
            }

            public void cancel() {
            }
        }).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.4
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).transform("ReifyWatermarks", BasicTypeInfo.STRING_TYPE_INFO, new C1WatermarkReifier()).addSink(testListResultSink);
        process.getSideOutput(outputTag2).transform("ReifyWatermarks", BasicTypeInfo.STRING_TYPE_INFO, new C1WatermarkReifier()).addSink(testListResultSink2);
        process.map(new MapFunction<Integer, String>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.5
            private static final long serialVersionUID = 1;

            public String map(Integer num) throws Exception {
                return num.toString();
            }
        }).transform("ReifyWatermarks", BasicTypeInfo.STRING_TYPE_INFO, new C1WatermarkReifier()).addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:0", "WM:0", "WM:2", "WM:2", "WM:2", "WM:9223372036854775807", "WM:9223372036854775807", "WM:9223372036854775807"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:0", "WM:0", "WM:2", "WM:2", "WM:2", "WM:9223372036854775807", "WM:9223372036854775807", "WM:9223372036854775807"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:0", "WM:0", "WM:2", "WM:2", "WM:2", "WM:9223372036854775807", "WM:9223372036854775807", "WM:9223372036854775807"), testListResultSink3.getSortedResult());
    }

    @Test
    public void testSideOutputWithMultipleConsumers() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.6
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.7
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.getSideOutput(outputTag).addSink(testListResultSink2);
        process.addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink3.getSortedResult());
    }

    @Test
    public void testSideOutputWithMultipleConsumersWithObjectReuse() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.8
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.9
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.getSideOutput(outputTag).addSink(testListResultSink2);
        process.addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink3.getSortedResult());
    }

    @Test
    public void testDifferentSideOutputTypes() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("string") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.10
        };
        final OutputTag<Integer> outputTag2 = new OutputTag<Integer>("int") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.11
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.12
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
                context.output(outputTag2, 13);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.getSideOutput(outputTag2).addSink(testListResultSink2);
        process.addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(13, 13, 13, 13, 13), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink3.getSortedResult());
    }

    @Test
    public void testSideOutputNameClash() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.13
        };
        final OutputTag<Integer> outputTag2 = new OutputTag<Integer>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.14
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.15
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
                context.output(outputTag2, 13);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        this.expectedException.expect(UnsupportedOperationException.class);
        process.getSideOutput(outputTag2).addSink(testListResultSink2);
    }

    @Test
    public void testProcessFunctionSideOutput() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.16
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.17
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.addSink(testListResultSink2);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink2.getSortedResult());
    }

    @Test
    public void testCoProcessFunctionSideOutput() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.18
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).connect(executionEnvironment.fromCollection(elements)).process(new CoProcessFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.19
            public void processElement1(Integer num, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num.intValue() < 3) {
                    collector.collect(num);
                    context.output(outputTag, "sideout1-" + String.valueOf(num));
                }
            }

            public void processElement2(Integer num, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num.intValue() >= 3) {
                    collector.collect(num);
                    context.output(outputTag, "sideout2-" + String.valueOf(num));
                }
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.addSink(testListResultSink2);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink2.getSortedResult());
    }

    @Test
    public void testCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side1") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.20
        };
        final OutputTag<String> outputTag2 = new OutputTag<String>("side2") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.21
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).connect(executionEnvironment.fromCollection(elements)).process(new CoProcessFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.22
            public void processElement1(Integer num, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num.intValue() < 4) {
                    collector.collect(num);
                    context.output(outputTag, "sideout1-" + String.valueOf(num));
                }
            }

            public void processElement2(Integer num, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num.intValue() >= 4) {
                    collector.collect(num);
                    context.output(outputTag2, "sideout2-" + String.valueOf(num));
                }
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.getSideOutput(outputTag2).addSink(testListResultSink2);
        process.addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink3.getSortedResult());
    }

    @Test
    public void testKeyedProcessFunctionSideOutput() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.23
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).keyBy(new KeySelector<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.25
            private static final long serialVersionUID = 1;

            public Integer getKey(Integer num) throws Exception {
                return num;
            }
        }).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.24
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.addSink(testListResultSink2);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink2.getSortedResult());
    }

    @Test
    public void testLegacyKeyedCoProcessFunctionSideOutput() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.26
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).keyBy(num -> {
            return num;
        }).connect(executionEnvironment.fromCollection(elements).keyBy(num2 -> {
            return num2;
        })).process(new CoProcessFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.27
            public void processElement1(Integer num3, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() < 3) {
                    collector.collect(num3);
                    context.output(outputTag, "sideout1-" + String.valueOf(num3));
                }
            }

            public void processElement2(Integer num3, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() >= 3) {
                    collector.collect(num3);
                    context.output(outputTag, "sideout2-" + String.valueOf(num3));
                }
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.addSink(testListResultSink2);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink2.getSortedResult());
    }

    @Test
    public void testKeyedCoProcessFunctionSideOutput() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.28
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).keyBy(num -> {
            return num;
        }).connect(executionEnvironment.fromCollection(elements).keyBy(num2 -> {
            return num2;
        })).process(new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.29
            public void processElement1(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() < 3) {
                    collector.collect(num3);
                    context.output(outputTag, "sideout1-" + context.getCurrentKey() + "-" + String.valueOf(num3));
                }
            }

            public void processElement2(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() >= 3) {
                    collector.collect(num3);
                    context.output(outputTag, "sideout2-" + context.getCurrentKey() + "-" + String.valueOf(num3));
                }
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.addSink(testListResultSink2);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2", "sideout2-3-3", "sideout2-4-4", "sideout2-5-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink2.getSortedResult());
    }

    @Test
    public void testLegacyKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side1") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.30
        };
        final OutputTag<String> outputTag2 = new OutputTag<String>("side2") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.31
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).keyBy(num -> {
            return num;
        }).connect(executionEnvironment.fromCollection(elements).keyBy(num2 -> {
            return num2;
        })).process(new CoProcessFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.32
            public void processElement1(Integer num3, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() < 4) {
                    collector.collect(num3);
                    context.output(outputTag, "sideout1-" + String.valueOf(num3));
                }
            }

            public void processElement2(Integer num3, CoProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() >= 4) {
                    collector.collect(num3);
                    context.output(outputTag2, "sideout2-" + String.valueOf(num3));
                }
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Integer) obj, (CoProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.getSideOutput(outputTag2).addSink(testListResultSink2);
        process.addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink3.getSortedResult());
    }

    @Test
    public void testKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
        final OutputTag<String> outputTag = new OutputTag<String>("side1") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.33
        };
        final OutputTag<String> outputTag2 = new OutputTag<String>("side2") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.34
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        SingleOutputStreamOperator process = executionEnvironment.fromCollection(elements).keyBy(num -> {
            return num;
        }).connect(executionEnvironment.fromCollection(elements).keyBy(num2 -> {
            return num2;
        })).process(new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.35
            public void processElement1(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() < 4) {
                    collector.collect(num3);
                    context.output(outputTag, "sideout1-" + context.getCurrentKey() + "-" + String.valueOf(num3));
                }
            }

            public void processElement2(Integer num3, KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                if (num3.intValue() >= 4) {
                    collector.collect(num3);
                    context.output(outputTag2, "sideout2-" + context.getCurrentKey() + "-" + String.valueOf(num3));
                }
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink);
        process.getSideOutput(outputTag2).addSink(testListResultSink2);
        process.addSink(testListResultSink3);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2", "sideout1-3-3"), testListResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout2-4-4", "sideout2-5-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), testListResultSink3.getSortedResult());
    }

    @Test
    public void testProcessFunctionSideOutputWithWrongTag() throws Exception {
        OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.36
        };
        final OutputTag<String> outputTag2 = new OutputTag<String>("other-side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.37
        };
        TestListResultSink testListResultSink = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        executionEnvironment.fromCollection(elements).process(new ProcessFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.38
            private static final long serialVersionUID = 1;

            public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag2, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }
        }).getSideOutput(outputTag).addSink(testListResultSink);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList(new Object[0]), testListResultSink.getSortedResult());
    }

    @Test
    public void testAllWindowLateArrivingEvents() throws Exception {
        TestListResultSink testListResultSink = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource fromCollection = executionEnvironment.fromCollection(elements);
        OutputTag<Integer> outputTag = new OutputTag<Integer>("late") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.39
        };
        fromCollection.assignTimestampsAndWatermarks(new TestWatermarkAssigner()).timeWindowAll(Time.milliseconds(1L), Time.milliseconds(1L)).sideOutputLateData(outputTag).apply(new AllWindowFunction<Integer, Integer, TimeWindow>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.40
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
                Iterator<Integer> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
            }
        }).getSideOutput(outputTag).flatMap(new FlatMapFunction<Integer, String>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.41
            private static final long serialVersionUID = 1;

            public void flatMap(Integer num, Collector<String> collector) throws Exception {
                collector.collect("late-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<String>) collector);
            }
        }).addSink(testListResultSink);
        executionEnvironment.execute();
        Assert.assertEquals(testListResultSink.getSortedResult(), Arrays.asList("late-3", "late-4"));
    }

    @Test
    public void testKeyedWindowLateArrivingEvents() throws Exception {
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource fromCollection = executionEnvironment.fromCollection(elements);
        OutputTag<Integer> outputTag = new OutputTag<Integer>("late") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.42
        };
        SingleOutputStreamOperator apply = fromCollection.assignTimestampsAndWatermarks(new TestWatermarkAssigner()).keyBy(new TestKeySelector()).timeWindow(Time.milliseconds(1L), Time.milliseconds(1L)).allowedLateness(Time.milliseconds(2L)).sideOutputLateData(outputTag).apply(new WindowFunction<Integer, String, Integer, TimeWindow>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.43
            private static final long serialVersionUID = 1;

            public void apply(Integer num, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
                Iterator<Integer> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(String.valueOf(num) + "-" + String.valueOf(it.next()));
                }
            }

            public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                apply((Integer) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<String>) collector);
            }
        });
        apply.addSink(testListResultSink);
        apply.getSideOutput(outputTag).addSink(testListResultSink2);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("1-1", "2-2", "4-4", "5-5"), testListResultSink.getSortedResult());
        Assert.assertEquals(Collections.singletonList(3), testListResultSink2.getSortedResult());
    }

    @Test
    public void testProcessdWindowFunctionSideOutput() throws Exception {
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource fromCollection = executionEnvironment.fromCollection(elements);
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.44
        };
        SingleOutputStreamOperator process = fromCollection.assignTimestampsAndWatermarks(new TestWatermarkAssigner()).keyBy(new TestKeySelector()).timeWindow(Time.milliseconds(1L), Time.milliseconds(1L)).process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.45
            private static final long serialVersionUID = 1;

            public void process(Integer num, ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
                collector.collect(num);
                context.output(outputTag, "sideout-" + String.valueOf(num));
            }

            public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
                process((Integer) obj, (ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context) context, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink2);
        process.addSink(testListResultSink);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 5), testListResultSink.getSortedResult());
    }

    @Test
    public void testProcessAllWindowFunctionSideOutput() throws Exception {
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource fromCollection = executionEnvironment.fromCollection(elements);
        final OutputTag<String> outputTag = new OutputTag<String>("side") { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.46
        };
        SingleOutputStreamOperator process = fromCollection.assignTimestampsAndWatermarks(new TestWatermarkAssigner()).timeWindowAll(Time.milliseconds(1L), Time.milliseconds(1L)).process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() { // from class: org.apache.flink.test.streaming.runtime.SideOutputITCase.47
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Integer, Integer, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
                for (Integer num : iterable) {
                    collector.collect(num);
                    context.output(outputTag, "sideout-" + String.valueOf(num));
                }
            }
        });
        process.getSideOutput(outputTag).addSink(testListResultSink2);
        process.addSink(testListResultSink);
        executionEnvironment.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), testListResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 5), testListResultSink.getSortedResult());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -769416149:
                if (implMethodName.equals("lambda$testLegacyKeyedCoProcessFunctionSideOutput$5c9b7dcf$1")) {
                    z = 6;
                    break;
                }
                break;
            case -769416148:
                if (implMethodName.equals("lambda$testLegacyKeyedCoProcessFunctionSideOutput$5c9b7dcf$2")) {
                    z = 7;
                    break;
                }
                break;
            case 612433012:
                if (implMethodName.equals("lambda$testKeyedCoProcessFunctionSideOutput$5c9b7dcf$1")) {
                    z = 2;
                    break;
                }
                break;
            case 612433013:
                if (implMethodName.equals("lambda$testKeyedCoProcessFunctionSideOutput$5c9b7dcf$2")) {
                    z = 3;
                    break;
                }
                break;
            case 1640582910:
                if (implMethodName.equals("lambda$testLegacyKeyedCoProcessFunctionSideOutputWithMultipleConsumers$5c9b7dcf$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1640582911:
                if (implMethodName.equals("lambda$testLegacyKeyedCoProcessFunctionSideOutputWithMultipleConsumers$5c9b7dcf$2")) {
                    z = 4;
                    break;
                }
                break;
            case 1876242005:
                if (implMethodName.equals("lambda$testKeyedCoProcessFunctionSideOutputWithMultipleConsumers$5c9b7dcf$1")) {
                    z = false;
                    break;
                }
                break;
            case 1876242006:
                if (implMethodName.equals("lambda$testKeyedCoProcessFunctionSideOutputWithMultipleConsumers$5c9b7dcf$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num3 -> {
                        return num3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num22 -> {
                        return num22;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num23 -> {
                        return num23;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num4 -> {
                        return num4;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num5 -> {
                        return num5;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SideOutputITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num24 -> {
                        return num24;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        elements.add(1);
        elements.add(2);
        elements.add(5);
        elements.add(3);
        elements.add(4);
    }
}
