/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.datastream;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class DataStreamBatchExecutionITCase {
    private static final int DEFAULT_PARALLELISM = 1;
    @ClassRule
    public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
    static final MapStateDescriptor<String, String> STATE_DESCRIPTOR = new MapStateDescriptor("bc-input", (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE);
    static final ValueStateDescriptor<String> KEYED_STATE_DESCRIPTOR = new ValueStateDescriptor("keyed-state", (TypeSerializer)StringSerializer.INSTANCE);
    static final ListStateDescriptor<String> LIST_STATE_DESCRIPTOR = new ListStateDescriptor("bc-list-input", (TypeSerializer)StringSerializer.INSTANCE);
    private static final AbstractStreamOperatorFactory<String> mixedInputsOperatorFactory = new AbstractStreamOperatorFactory<String>(){

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new TestMixedMultipleInputOperator(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestMixedMultipleInputOperator.class;
        }
    };

    @Test
    public void batchFailoverWithKeyByBarrier() throws Exception {
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new String[]{"foo", "bar"});
        SingleOutputStreamOperator mapped = source.map((MapFunction)new SuffixAttemptId("a")).map((MapFunction)new SuffixAttemptId("b")).keyBy((KeySelector & Serializable)in -> in).map((MapFunction)new SuffixAttemptId("c")).map((MapFunction)new OnceFailingMapper("d"));
        try (CloseableIterator result = mapped.executeAndCollect();){
            Assert.assertThat((Object)CollectionUtil.iteratorToList((Iterator)result), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1"}));
        }
    }

    @Test
    public void batchFailoverWithRebalanceBarrier() throws Exception {
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new String[]{"foo", "bar"});
        SingleOutputStreamOperator mapped = source.map((MapFunction)new SuffixAttemptId("a")).map((MapFunction)new SuffixAttemptId("b")).rebalance().map((MapFunction)new SuffixAttemptId("c")).map((MapFunction)new OnceFailingMapper("d"));
        try (CloseableIterator result = mapped.executeAndCollect();){
            Assert.assertThat((Object)CollectionUtil.iteratorToList((Iterator)result), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1"}));
        }
    }

    @Test
    public void batchFailoverWithRescaleBarrier() throws Exception {
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new String[]{"foo", "bar"});
        env.setParallelism(1);
        SingleOutputStreamOperator mapped = source.map((MapFunction)new SuffixAttemptId("a")).map((MapFunction)new SuffixAttemptId("b")).rescale().map((MapFunction)new SuffixAttemptId("c")).setParallelism(2).map((MapFunction)new OnceFailingMapper("d")).setParallelism(2);
        try (CloseableIterator result = mapped.executeAndCollect();){
            Assert.assertThat((Object)CollectionUtil.iteratorToList((Iterator)result), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1"}));
        }
    }

    @Test
    public void batchReduceSingleResultPerKey() throws Exception {
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        DataStreamSource numbers = env.fromSequence(0L, 10L);
        KeyedStream stream = numbers.keyBy((KeySelector & Serializable)i -> i % 2L);
        SingleOutputStreamOperator sums = stream.reduce(Long::sum);
        try (CloseableIterator sumsIterator = sums.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)sumsIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList(30L, 25L)));
        }
    }

    @Test
    public void batchSumSingleResultPerKey() throws Exception {
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        DataStreamSource numbers = env.fromSequence(0L, 10L);
        KeyedStream stream = numbers.keyBy((KeySelector & Serializable)i -> i % 2L);
        SingleOutputStreamOperator sums = stream.sum(0);
        try (CloseableIterator sumsIterator = sums.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)sumsIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList(30L, 25L)));
        }
    }

    @Test
    public void batchKeyedNonKeyedTwoInputOperator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator keyedInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular2", (Object)4), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)2), Tuple2.of((Object)"regular2", (Object)1)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        SingleOutputStreamOperator regularInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular4", (Object)4), Tuple2.of((Object)"regular3", (Object)3), Tuple2.of((Object)"regular3", (Object)2), Tuple2.of((Object)"regular4", (Object)1)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        SingleOutputStreamOperator result = keyedInput.keyBy((KeySelector & Serializable)in -> (String)in.f0).connect((DataStream)regularInput).transform("operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)new TwoInputIdentityOperator());
        try (CloseableIterator resultIterator = result.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)resultIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList("(regular4,4)", "(regular3,3)", "(regular3,2)", "(regular4,1)", "(regular1,2)", "(regular1,3)", "(regular2,1)", "(regular2,4)")));
        }
    }

    @Test
    public void batchNonKeyedKeyedTwoInputOperator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator keyedInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular2", (Object)4), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)2), Tuple2.of((Object)"regular2", (Object)1)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        SingleOutputStreamOperator regularInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular4", (Object)4), Tuple2.of((Object)"regular3", (Object)3), Tuple2.of((Object)"regular3", (Object)2), Tuple2.of((Object)"regular4", (Object)1)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        SingleOutputStreamOperator result = regularInput.connect((DataStream)keyedInput.keyBy((KeySelector & Serializable)in -> (String)in.f0)).transform("operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)new TwoInputIdentityOperator());
        try (CloseableIterator resultIterator = result.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)resultIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList("(regular4,4)", "(regular3,3)", "(regular3,2)", "(regular4,1)", "(regular1,2)", "(regular1,3)", "(regular2,1)", "(regular2,4)")));
        }
    }

    @Test
    public void batchKeyedBroadcastExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator bcInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"bc1", (Object)1), Tuple2.of((Object)"bc2", (Object)2), Tuple2.of((Object)"bc3", (Object)3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        SingleOutputStreamOperator regularInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular1", (Object)1), Tuple2.of((Object)"regular1", (Object)2), Tuple2.of((Object)"regular2", (Object)2), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)4), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular2", (Object)5), Tuple2.of((Object)"regular1", (Object)5), Tuple2.of((Object)"regular2", (Object)3), Tuple2.of((Object)"regular1", (Object)3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        BroadcastStream broadcastStream = bcInput.broadcast(new MapStateDescriptor[]{STATE_DESCRIPTOR});
        SingleOutputStreamOperator result = regularInput.keyBy((KeySelector & Serializable)input -> (String)input.f0).connect(broadcastStream).process((KeyedBroadcastProcessFunction)new TestKeyedBroadcastFunction());
        try (CloseableIterator resultIterator = result.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)resultIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,2): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,4): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,5): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular2,2): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular2,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular2,5): [bc2=bc2, bc1=bc1, bc3=bc3]")));
        }
    }

    @Test
    public void batchBroadcastExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator bcInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"bc1", (Object)1), Tuple2.of((Object)"bc2", (Object)2), Tuple2.of((Object)"bc3", (Object)3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        SingleOutputStreamOperator regularInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular1", (Object)1), Tuple2.of((Object)"regular1", (Object)2), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)4), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)5), Tuple2.of((Object)"regular1", (Object)3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue()));
        BroadcastStream broadcastStream = bcInput.broadcast(new MapStateDescriptor[]{STATE_DESCRIPTOR});
        SingleOutputStreamOperator result = regularInput.connect(broadcastStream).process((BroadcastProcessFunction)new TestBroadcastFunction());
        try (CloseableIterator resultIterator = result.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)resultIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,2): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,4): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,5): [bc2=bc2, bc1=bc1, bc3=bc3]", "(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]")));
        }
    }

    @Test
    public void batchMixedKeyedAndNonKeyedTwoInputOperator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream bcInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"bc3", (Object)3), Tuple2.of((Object)"bc2", (Object)2), Tuple2.of((Object)"bc1", (Object)1)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue())).broadcast();
        KeyedStream regularInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular1", (Object)1), Tuple2.of((Object)"regular1", (Object)2), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)4), Tuple2.of((Object)"regular2", (Object)3), Tuple2.of((Object)"regular2", (Object)5), Tuple2.of((Object)"regular1", (Object)3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue())).keyBy((KeySelector & Serializable)input -> (String)input.f0);
        TwoInputTransformation twoInputTransformation = new TwoInputTransformation(regularInput.getTransformation(), bcInput.getTransformation(), "operator", (TwoInputStreamOperator)new TestMixedTwoInputOperator(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, 1);
        twoInputTransformation.setStateKeyType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        twoInputTransformation.setStateKeySelectors((KeySelector & Serializable)input -> input.f0, null);
        DataStream result = new DataStream(env, (Transformation)twoInputTransformation);
        try (CloseableIterator resultIterator = result.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)resultIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc3, bc2, bc1]", "(regular1,2): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,4): [bc3, bc2, bc1]", "(regular2,3): [bc3, bc2, bc1]", "(regular2,5): [bc3, bc2, bc1]")));
        }
    }

    @Test
    public void batchMixedKeyedAndNonKeyedMultiInputOperator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream bc1Input = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"bc3", (Object)3), Tuple2.of((Object)"bc2", (Object)2)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue())).broadcast();
        DataStream bc2Input = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"bc1", (Object)1)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue())).broadcast();
        KeyedStream regularInput = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"regular1", (Object)1), Tuple2.of((Object)"regular1", (Object)2), Tuple2.of((Object)"regular1", (Object)3), Tuple2.of((Object)"regular1", (Object)4), Tuple2.of((Object)"regular2", (Object)3), Tuple2.of((Object)"regular2", (Object)5), Tuple2.of((Object)"regular1", (Object)3)}).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(in, ts) -> ((Integer)in.f1).intValue())).keyBy((KeySelector & Serializable)input -> (String)input.f0);
        KeyedMultipleInputTransformation multipleInputTransformation = new KeyedMultipleInputTransformation("operator", mixedInputsOperatorFactory, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, 1, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        multipleInputTransformation.addInput(regularInput.getTransformation(), (KeySelector & Serializable)input -> ((Tuple2)input).f0);
        multipleInputTransformation.addInput(bc1Input.getTransformation(), null);
        multipleInputTransformation.addInput(bc2Input.getTransformation(), null);
        SingleOutputStreamOperator result = new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)multipleInputTransformation);
        try (CloseableIterator resultIterator = result.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)resultIterator);
            Assert.assertThat((Object)results, (Matcher)CoreMatchers.equalTo(Arrays.asList("(regular1,1): [bc3, bc2, bc1]", "(regular1,2): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,3): [bc3, bc2, bc1]", "(regular1,4): [bc3, bc2, bc1]", "(regular2,3): [bc3, bc2, bc1]", "(regular2,5): [bc3, bc2, bc1]")));
        }
    }

    private StreamExecutionEnvironment getExecutionEnvironment() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(1);
        env.enableCheckpointing(42L);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)10, (long)1L);
        return env;
    }

    private static class TestMixedMultipleInputOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        public TestMixedMultipleInputOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new AbstractInput<Tuple2<String, Integer>, String>((AbstractStreamOperatorV2)this, 1){

                public void processElement(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
                    ListState operatorState = this.getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
                    this.output.collect((Object)new StreamRecord((Object)(element.getValue() + ": " + ((Iterable)operatorState.get()).toString())));
                }
            }, new AbstractInput<Tuple2<String, Integer>, String>((AbstractStreamOperatorV2)this, 2){

                public void processElement(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
                    ListState operatorState = this.getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
                    operatorState.add((Object)((String)((Tuple2)element.getValue()).f0));
                }
            }, new AbstractInput<Tuple2<String, Integer>, String>((AbstractStreamOperatorV2)this, 3){

                public void processElement(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
                    ListState operatorState = this.getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
                    operatorState.add((Object)((String)((Tuple2)element.getValue()).f0));
                }
            });
        }
    }

    private static final class TestMixedTwoInputOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TestMixedTwoInputOperator() {
        }

        public void processElement1(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
            ListState operatorState = this.getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
            this.output.collect((Object)new StreamRecord((Object)(element.getValue() + ": " + ((Iterable)operatorState.get()).toString())));
        }

        public void processElement2(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
            ListState operatorState = this.getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
            operatorState.add((Object)((String)((Tuple2)element.getValue()).f0));
        }
    }

    private static class TwoInputIdentityOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TwoInputIdentityOperator() {
        }

        public void processElement1(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
            this.output.collect((Object)new StreamRecord((Object)((Tuple2)element.getValue()).toString(), element.getTimestamp()));
        }

        public void processElement2(StreamRecord<Tuple2<String, Integer>> element) throws Exception {
            this.output.collect((Object)new StreamRecord((Object)((Tuple2)element.getValue()).toString(), element.getTimestamp()));
        }
    }

    private static class TestBroadcastFunction
    extends BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TestBroadcastFunction() {
        }

        public void processElement(Tuple2<String, Integer> value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ReadOnlyBroadcastState state = ctx.getBroadcastState(STATE_DESCRIPTOR);
            out.collect((Object)(value + ": " + state.immutableEntries().toString()));
        }

        public void processBroadcastElement(Tuple2<String, Integer> value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            BroadcastState state = ctx.getBroadcastState(STATE_DESCRIPTOR);
            state.put((Object)((String)value.f0), (Object)((String)value.f0));
        }
    }

    private static class TestKeyedBroadcastFunction
    extends KeyedBroadcastProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private TestKeyedBroadcastFunction() {
        }

        public void processElement(Tuple2<String, Integer> value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ReadOnlyBroadcastState state = ctx.getBroadcastState(STATE_DESCRIPTOR);
            out.collect((Object)(value + ": " + state.immutableEntries().toString()));
        }

        public void processBroadcastElement(Tuple2<String, Integer> value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            BroadcastState state = ctx.getBroadcastState(STATE_DESCRIPTOR);
            state.put((Object)((String)value.f0), (Object)((String)value.f0));
            ctx.applyToKeyedState(KEYED_STATE_DESCRIPTOR, (key, state1) -> {
                throw new RuntimeException("Shouldn't happen");
            });
        }
    }

    public static class OnceFailingMapper
    extends RichMapFunction<String, String> {
        private final String suffix;

        public OnceFailingMapper(String suffix) {
            this.suffix = suffix;
        }

        public String map(String value) throws Exception {
            if (this.getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
                throw new RuntimeException("FAILING");
            }
            return value + "-" + this.suffix + this.getRuntimeContext().getTaskInfo().getAttemptNumber();
        }
    }

    public static class SuffixAttemptId
    extends RichMapFunction<String, String> {
        private final String suffix;

        public SuffixAttemptId(String suffix) {
            this.suffix = suffix;
        }

        public String map(String value) {
            return value + "-" + this.suffix + this.getRuntimeContext().getTaskInfo().getAttemptNumber();
        }
    }
}

