package org.apache.flink.api.datastream;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.class */
public class DataStreamBatchExecutionITCase {
    private static final int DEFAULT_PARALLELISM = 1;

    @ClassRule
    public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(DEFAULT_PARALLELISM).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).build());
    static final MapStateDescriptor<String, String> STATE_DESCRIPTOR = new MapStateDescriptor<>("bc-input", StringSerializer.INSTANCE, StringSerializer.INSTANCE);
    static final ValueStateDescriptor<String> KEYED_STATE_DESCRIPTOR = new ValueStateDescriptor<>("keyed-state", StringSerializer.INSTANCE);
    static final ListStateDescriptor<String> LIST_STATE_DESCRIPTOR = new ListStateDescriptor<>("bc-list-input", StringSerializer.INSTANCE);
    private static final AbstractStreamOperatorFactory<String> mixedInputsOperatorFactory = new AbstractStreamOperatorFactory<String>() { // from class: org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.1
        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new TestMixedMultipleInputOperator(streamOperatorParameters);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestMixedMultipleInputOperator.class;
        }
    };

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$OnceFailingMapper.class */
    public static class OnceFailingMapper extends RichMapFunction<String, String> {
        private final String suffix;

        public OnceFailingMapper(String str) {
            this.suffix = str;
        }

        public String map(String str) throws Exception {
            if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
                throw new RuntimeException("FAILING");
            }
            return str + "-" + this.suffix + getRuntimeContext().getTaskInfo().getAttemptNumber();
        }
    }

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$SuffixAttemptId.class */
    public static class SuffixAttemptId extends RichMapFunction<String, String> {
        private final String suffix;

        public SuffixAttemptId(String str) {
            this.suffix = str;
        }

        public String map(String str) {
            return str + "-" + this.suffix + getRuntimeContext().getTaskInfo().getAttemptNumber();
        }
    }

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$TestBroadcastFunction.class */
    private static class TestBroadcastFunction extends BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TestBroadcastFunction() {
        }

        public void processElement(Tuple2<String, Integer> tuple2, BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            collector.collect(tuple2 + ": " + readOnlyContext.getBroadcastState(DataStreamBatchExecutionITCase.STATE_DESCRIPTOR).immutableEntries().toString());
        }

        public void processBroadcastElement(Tuple2<String, Integer> tuple2, BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context context, Collector<String> collector) throws Exception {
            context.getBroadcastState(DataStreamBatchExecutionITCase.STATE_DESCRIPTOR).put(tuple2.f0, tuple2.f0);
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, BroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Tuple2<String, Integer>) obj, (BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Tuple2<String, Integer>) obj, (BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.ReadOnlyContext) readOnlyContext, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$TestKeyedBroadcastFunction.class */
    private static class TestKeyedBroadcastFunction extends KeyedBroadcastProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TestKeyedBroadcastFunction() {
        }

        public void processElement(Tuple2<String, Integer> tuple2, KeyedBroadcastProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            collector.collect(tuple2 + ": " + readOnlyContext.getBroadcastState(DataStreamBatchExecutionITCase.STATE_DESCRIPTOR).immutableEntries().toString());
        }

        public void processBroadcastElement(Tuple2<String, Integer> tuple2, KeyedBroadcastProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context context, Collector<String> collector) throws Exception {
            context.getBroadcastState(DataStreamBatchExecutionITCase.STATE_DESCRIPTOR).put(tuple2.f0, tuple2.f0);
            context.applyToKeyedState(DataStreamBatchExecutionITCase.KEYED_STATE_DESCRIPTOR, (str, valueState) -> {
                throw new RuntimeException("Shouldn't happen");
            });
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, KeyedBroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Tuple2<String, Integer>) obj, (KeyedBroadcastProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Tuple2<String, Integer>) obj, (KeyedBroadcastProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>, String>.ReadOnlyContext) readOnlyContext, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$TestMixedMultipleInputOperator.class */
    private static class TestMixedMultipleInputOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {
        public TestMixedMultipleInputOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new AbstractInput<Tuple2<String, Integer>, String>(this, DataStreamBatchExecutionITCase.DEFAULT_PARALLELISM) { // from class: org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.TestMixedMultipleInputOperator.1
                public void processElement(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
                    this.output.collect(new StreamRecord(streamRecord.getValue() + ": " + ((Iterable) TestMixedMultipleInputOperator.this.getOperatorStateBackend().getListState(DataStreamBatchExecutionITCase.LIST_STATE_DESCRIPTOR).get()).toString()));
                }
            }, new AbstractInput<Tuple2<String, Integer>, String>(this, 2) { // from class: org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.TestMixedMultipleInputOperator.2
                public void processElement(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
                    TestMixedMultipleInputOperator.this.getOperatorStateBackend().getListState(DataStreamBatchExecutionITCase.LIST_STATE_DESCRIPTOR).add(((Tuple2) streamRecord.getValue()).f0);
                }
            }, new AbstractInput<Tuple2<String, Integer>, String>(this, 3) { // from class: org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.TestMixedMultipleInputOperator.3
                public void processElement(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
                    TestMixedMultipleInputOperator.this.getOperatorStateBackend().getListState(DataStreamBatchExecutionITCase.LIST_STATE_DESCRIPTOR).add(((Tuple2) streamRecord.getValue()).f0);
                }
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$TestMixedTwoInputOperator.class */
    private static final class TestMixedTwoInputOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TestMixedTwoInputOperator() {
        }

        public void processElement1(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
            this.output.collect(new StreamRecord(streamRecord.getValue() + ": " + ((Iterable) getOperatorStateBackend().getListState(DataStreamBatchExecutionITCase.LIST_STATE_DESCRIPTOR).get()).toString()));
        }

        public void processElement2(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
            getOperatorStateBackend().getListState(DataStreamBatchExecutionITCase.LIST_STATE_DESCRIPTOR).add(((Tuple2) streamRecord.getValue()).f0);
        }
    }

    /* loaded from: input_file:org/apache/flink/api/datastream/DataStreamBatchExecutionITCase$TwoInputIdentityOperator.class */
    private static class TwoInputIdentityOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TwoInputIdentityOperator() {
        }

        public void processElement1(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
            this.output.collect(new StreamRecord(((Tuple2) streamRecord.getValue()).toString(), streamRecord.getTimestamp()));
        }

        public void processElement2(StreamRecord<Tuple2<String, Integer>> streamRecord) throws Exception {
            this.output.collect(new StreamRecord(((Tuple2) streamRecord.getValue()).toString(), streamRecord.getTimestamp()));
        }
    }

    @Test
    public void batchFailoverWithKeyByBarrier() throws Exception {
        CloseableIterator executeAndCollect = getExecutionEnvironment().fromData(new String[]{"foo", "bar"}).map(new SuffixAttemptId("a")).map(new SuffixAttemptId("b")).keyBy(str -> {
            return str;
        }).map(new SuffixAttemptId("c")).map(new OnceFailingMapper("d")).executeAndCollect();
        Throwable th = null;
        try {
            try {
                Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), Matchers.containsInAnyOrder(new String[]{"foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1"}));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void batchFailoverWithRebalanceBarrier() throws Exception {
        CloseableIterator executeAndCollect = getExecutionEnvironment().fromData(new String[]{"foo", "bar"}).map(new SuffixAttemptId("a")).map(new SuffixAttemptId("b")).rebalance().map(new SuffixAttemptId("c")).map(new OnceFailingMapper("d")).executeAndCollect();
        Throwable th = null;
        try {
            Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), Matchers.containsInAnyOrder(new String[]{"foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1"}));
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void batchFailoverWithRescaleBarrier() throws Exception {
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        DataStreamSource fromData = executionEnvironment.fromData(new String[]{"foo", "bar"});
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        CloseableIterator executeAndCollect = fromData.map(new SuffixAttemptId("a")).map(new SuffixAttemptId("b")).rescale().map(new SuffixAttemptId("c")).setParallelism(2).map(new OnceFailingMapper("d")).setParallelism(2).executeAndCollect();
        Throwable th = null;
        try {
            Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), Matchers.containsInAnyOrder(new String[]{"foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1"}));
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void batchReduceSingleResultPerKey() throws Exception {
        CloseableIterator executeAndCollect = getExecutionEnvironment().fromSequence(0L, 10L).keyBy(l -> {
            return Long.valueOf(l.longValue() % 2);
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).executeAndCollect();
        Throwable th = null;
        try {
            try {
                Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList(30L, 25L)));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void batchSumSingleResultPerKey() throws Exception {
        CloseableIterator executeAndCollect = getExecutionEnvironment().fromSequence(0L, 10L).keyBy(l -> {
            return Long.valueOf(l.longValue() % 2);
        }).sum(0).executeAndCollect();
        Throwable th = null;
        try {
            try {
                Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList(30L, 25L)));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void batchKeyedNonKeyedTwoInputOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        CloseableIterator executeAndCollect = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular2", 4), Tuple2.of("regular1", 3), Tuple2.of("regular1", 2), Tuple2.of("regular2", Integer.valueOf(DEFAULT_PARALLELISM))}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        })).keyBy(tuple22 -> {
            return (String) tuple22.f0;
        }).connect(executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular4", 4), Tuple2.of("regular3", 3), Tuple2.of("regular3", 2), Tuple2.of("regular4", Integer.valueOf(DEFAULT_PARALLELISM))}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple23, j2) -> {
            return ((Integer) tuple23.f1).intValue();
        }))).transform("operator", BasicTypeInfo.STRING_TYPE_INFO, new TwoInputIdentityOperator()).executeAndCollect();
        Throwable th = null;
        try {
            Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList("(regular4,4)", "(regular3,3)", "(regular3,2)", "(regular4,1)", "(regular1,2)", "(regular1,3)", "(regular2,1)", "(regular2,4)")));
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void batchNonKeyedKeyedTwoInputOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        CloseableIterator executeAndCollect = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular4", 4), Tuple2.of("regular3", 3), Tuple2.of("regular3", 2), Tuple2.of("regular4", Integer.valueOf(DEFAULT_PARALLELISM))}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        })).connect(executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular2", 4), Tuple2.of("regular1", 3), Tuple2.of("regular1", 2), Tuple2.of("regular2", Integer.valueOf(DEFAULT_PARALLELISM))}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple22, j2) -> {
            return ((Integer) tuple22.f1).intValue();
        })).keyBy(tuple23 -> {
            return (String) tuple23.f0;
        })).transform("operator", BasicTypeInfo.STRING_TYPE_INFO, new TwoInputIdentityOperator()).executeAndCollect();
        Throwable th = null;
        try {
            Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList("(regular4,4)", "(regular3,3)", "(regular3,2)", "(regular4,1)", "(regular1,2)", "(regular1,3)", "(regular2,1)", "(regular2,4)")));
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void batchKeyedBroadcastExecution() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        CloseableIterator executeAndCollect = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular1", Integer.valueOf(DEFAULT_PARALLELISM)), Tuple2.of("regular1", 2), Tuple2.of("regular2", 2), Tuple2.of("regular1", 3), Tuple2.of("regular1", 4), Tuple2.of("regular1", 3), Tuple2.of("regular2", 5), Tuple2.of("regular1", 5), Tuple2.of("regular2", 3), Tuple2.of("regular1", 3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        })).keyBy(tuple22 -> {
            return (String) tuple22.f0;
        }).connect(executionEnvironment.fromData(new Tuple2[]{Tuple2.of("bc1", Integer.valueOf(DEFAULT_PARALLELISM)), Tuple2.of("bc2", 2), Tuple2.of("bc3", 3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple23, j2) -> {
            return ((Integer) tuple23.f1).intValue();
        })).broadcast(new MapStateDescriptor[]{STATE_DESCRIPTOR})).process(new TestKeyedBroadcastFunction()).executeAndCollect();
        Throwable th = null;
        try {
            try {
                Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,2): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,4): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,5): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular2,2): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular2,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular2,5): [bc2=bc2, bc1=bc1, bc3=bc3]")));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void batchBroadcastExecution() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        CloseableIterator executeAndCollect = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular1", Integer.valueOf(DEFAULT_PARALLELISM)), Tuple2.of("regular1", 2), Tuple2.of("regular1", 3), Tuple2.of("regular1", 4), Tuple2.of("regular1", 3), Tuple2.of("regular1", 5), Tuple2.of("regular1", 3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        })).connect(executionEnvironment.fromData(new Tuple2[]{Tuple2.of("bc1", Integer.valueOf(DEFAULT_PARALLELISM)), Tuple2.of("bc2", 2), Tuple2.of("bc3", 3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple22, j2) -> {
            return ((Integer) tuple22.f1).intValue();
        })).broadcast(new MapStateDescriptor[]{STATE_DESCRIPTOR})).process(new TestBroadcastFunction()).executeAndCollect();
        Throwable th = null;
        try {
            try {
                Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,2): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,4): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,5): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]")));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void batchMixedKeyedAndNonKeyedTwoInputOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        TwoInputTransformation twoInputTransformation = new TwoInputTransformation(executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular1", Integer.valueOf(DEFAULT_PARALLELISM)), Tuple2.of("regular1", 2), Tuple2.of("regular1", 3), Tuple2.of("regular1", 4), Tuple2.of("regular2", 3), Tuple2.of("regular2", 5), Tuple2.of("regular1", 3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        })).keyBy(tuple22 -> {
            return (String) tuple22.f0;
        }).getTransformation(), executionEnvironment.fromData(new Tuple2[]{Tuple2.of("bc3", 3), Tuple2.of("bc2", 2), Tuple2.of("bc1", Integer.valueOf(DEFAULT_PARALLELISM))}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple23, j2) -> {
            return ((Integer) tuple23.f1).intValue();
        })).broadcast().getTransformation(), "operator", new TestMixedTwoInputOperator(), BasicTypeInfo.STRING_TYPE_INFO, DEFAULT_PARALLELISM);
        twoInputTransformation.setStateKeyType(BasicTypeInfo.STRING_TYPE_INFO);
        twoInputTransformation.setStateKeySelectors(tuple24 -> {
            return (String) tuple24.f0;
        }, (KeySelector) null);
        CloseableIterator executeAndCollect = new DataStream(executionEnvironment, twoInputTransformation).executeAndCollect();
        Throwable th = null;
        try {
            try {
                Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc3, bc2, bc1]", "(regular1,2): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,4): [bc3, bc2, bc1]", "(regular2,3): [bc3, bc2, bc1]", "(regular2,5): [bc3, bc2, bc1]")));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void batchMixedKeyedAndNonKeyedMultiInputOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream broadcast = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("bc3", 3), Tuple2.of("bc2", 2)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple2, j) -> {
            return ((Integer) tuple2.f1).intValue();
        })).broadcast();
        DataStream broadcast2 = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("bc1", Integer.valueOf(DEFAULT_PARALLELISM))}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple22, j2) -> {
            return ((Integer) tuple22.f1).intValue();
        })).broadcast();
        KeyedStream keyBy = executionEnvironment.fromData(new Tuple2[]{Tuple2.of("regular1", Integer.valueOf(DEFAULT_PARALLELISM)), Tuple2.of("regular1", 2), Tuple2.of("regular1", 3), Tuple2.of("regular1", 4), Tuple2.of("regular2", 3), Tuple2.of("regular2", 5), Tuple2.of("regular1", 3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((tuple23, j3) -> {
            return ((Integer) tuple23.f1).intValue();
        })).keyBy(tuple24 -> {
            return (String) tuple24.f0;
        });
        KeyedMultipleInputTransformation keyedMultipleInputTransformation = new KeyedMultipleInputTransformation("operator", mixedInputsOperatorFactory, BasicTypeInfo.STRING_TYPE_INFO, DEFAULT_PARALLELISM, BasicTypeInfo.STRING_TYPE_INFO);
        keyedMultipleInputTransformation.addInput(keyBy.getTransformation(), obj -> {
            return (String) ((Tuple2) obj).f0;
        });
        keyedMultipleInputTransformation.addInput(broadcast.getTransformation(), (KeySelector) null);
        keyedMultipleInputTransformation.addInput(broadcast2.getTransformation(), (KeySelector) null);
        CloseableIterator executeAndCollect = new MultipleConnectedStreams(executionEnvironment).transform(keyedMultipleInputTransformation).executeAndCollect();
        Throwable th = null;
        try {
            Assert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc3, bc2, bc1]", "(regular1,2): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,4): [bc3, bc2, bc1]", "(regular2,3): [bc3, bc2, bc1]", "(regular2,5): [bc3, bc2, bc1]")));
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    private StreamExecutionEnvironment getExecutionEnvironment() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(DEFAULT_PARALLELISM);
        executionEnvironment.enableCheckpointing(42L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1L)));
        return executionEnvironment;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2136762506:
                if (implMethodName.equals("lambda$batchBroadcastExecution$3c3c30ea$1")) {
                    z = 18;
                    break;
                }
                break;
            case -2053898252:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedTwoInputOperator$b1755bf9$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1995316630:
                if (implMethodName.equals("lambda$batchBroadcastExecution$b1755bf9$1")) {
                    z = 20;
                    break;
                }
                break;
            case -1991176211:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedTwoInputOperator$e5e160a1$1")) {
                    z = 22;
                    break;
                }
                break;
            case -1930188707:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedMultiInputOperator$7d45171f$1")) {
                    z = 13;
                    break;
                }
                break;
            case -1896541776:
                if (implMethodName.equals("lambda$batchFailoverWithKeyByBarrier$87c86e1c$1")) {
                    z = false;
                    break;
                }
                break;
            case -1864354004:
                if (implMethodName.equals("lambda$batchNonKeyedKeyedTwoInputOperator$8ca5c1cc$1")) {
                    z = 9;
                    break;
                }
                break;
            case -994535483:
                if (implMethodName.equals("lambda$batchKeyedBroadcastExecution$9747a172$1")) {
                    z = 23;
                    break;
                }
                break;
            case -537248823:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedMultiInputOperator$3558be8e$1")) {
                    z = 16;
                    break;
                }
                break;
            case -522399897:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedMultiInputOperator$485edbde$1")) {
                    z = 21;
                    break;
                }
                break;
            case -402875150:
                if (implMethodName.equals("lambda$batchKeyedBroadcastExecution$3c3c30ea$1")) {
                    z = 12;
                    break;
                }
                break;
            case -261429274:
                if (implMethodName.equals("lambda$batchKeyedBroadcastExecution$b1755bf9$1")) {
                    z = 10;
                    break;
                }
                break;
            case -180765572:
                if (implMethodName.equals("lambda$batchReduceSingleResultPerKey$e5b1066f$1")) {
                    z = DEFAULT_PARALLELISM;
                    break;
                }
                break;
            case 114251:
                if (implMethodName.equals("sum")) {
                    z = 6;
                    break;
                }
                break;
            case 240651305:
                if (implMethodName.equals("lambda$batchKeyedNonKeyedTwoInputOperator$9747a172$1")) {
                    z = 14;
                    break;
                }
                break;
            case 419063260:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedTwoInputOperator$3558be8e$1")) {
                    z = 3;
                    break;
                }
                break;
            case 757054955:
                if (implMethodName.equals("lambda$batchNonKeyedKeyedTwoInputOperator$9747a172$1")) {
                    z = 17;
                    break;
                }
                break;
            case 775896913:
                if (implMethodName.equals("lambda$batchSumSingleResultPerKey$e5b1066f$1")) {
                    z = 11;
                    break;
                }
                break;
            case 832311638:
                if (implMethodName.equals("lambda$batchKeyedNonKeyedTwoInputOperator$3c3c30ea$1")) {
                    z = 7;
                    break;
                }
                break;
            case 1143311085:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedMultiInputOperator$3c3c30ea$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1347479002:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedMultiInputOperator$e5e160a1$1")) {
                    z = 8;
                    break;
                }
                break;
            case 1348715288:
                if (implMethodName.equals("lambda$batchNonKeyedKeyedTwoInputOperator$3c3c30ea$1")) {
                    z = 19;
                    break;
                }
                break;
            case 1914209642:
                if (implMethodName.equals("lambda$batchKeyedNonKeyedTwoInputOperator$8ca5c1cc$1")) {
                    z = 15;
                    break;
                }
                break;
            case 2099623168:
                if (implMethodName.equals("lambda$batchMixedKeyedAndNonKeyedTwoInputOperator$3c3c30ea$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
            case DEFAULT_PARALLELISM /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return Long.valueOf(l.longValue() % 2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple2, j) -> {
                        return ((Integer) tuple2.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Object;")) {
                    return tuple24 -> {
                        return (String) tuple24.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple23, j2) -> {
                        return ((Integer) tuple23.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple232, j3) -> {
                        return ((Integer) tuple232.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("(JJ)J")) {
                    return (v0, v1) -> {
                        return Long.sum(v0, v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple233, j22) -> {
                        return ((Integer) tuple233.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple242 -> {
                        return (String) tuple242.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple22, j23) -> {
                        return ((Integer) tuple22.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple234, j24) -> {
                        return ((Integer) tuple234.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l2 -> {
                        return Long.valueOf(l2.longValue() % 2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple25, j4) -> {
                        return ((Integer) tuple25.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple222, j25) -> {
                        return ((Integer) tuple222.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple223 -> {
                        return (String) tuple223.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple26, j5) -> {
                        return ((Integer) tuple26.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return (String) ((Tuple2) obj).f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple235 -> {
                        return (String) tuple235.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple27, j6) -> {
                        return ((Integer) tuple27.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple28, j7) -> {
                        return ((Integer) tuple28.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple224, j26) -> {
                        return ((Integer) tuple224.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;J)J")) {
                    return (tuple29, j8) -> {
                        return ((Integer) tuple29.f1).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple225 -> {
                        return (String) tuple225.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/datastream/DataStreamBatchExecutionITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple226 -> {
                        return (String) tuple226.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
