/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnalignedCheckpointRescaleITCase
extends UnalignedCheckpointTestBase {
    public static final int NUM_GROUPS = 100;
    private final Topology topology;
    private final int oldParallelism;
    private final int newParallelism;
    private final long sourceSleepMs;

    @Parameterized.Parameters(name="{0} {1} from {2} to {3}, sourceSleepMs = {4}")
    public static Object[][] getScaleFactors() {
        Object[][] parameters = new Object[][]{{"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L}, {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7, 0L}, {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 7, 12, 0L}, {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 5, 3, 5L}, {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 3, 5, 5L}, {"downscale", Topology.KEYED_BROADCAST, 7, 2, 0L}, {"upscale", Topology.KEYED_BROADCAST, 2, 7, 0L}, {"downscale", Topology.KEYED_BROADCAST, 5, 3, 5L}, {"upscale", Topology.KEYED_BROADCAST, 3, 5, 5L}, {"downscale", Topology.BROADCAST, 5, 2, 0L}, {"upscale", Topology.BROADCAST, 2, 5, 0L}, {"downscale", Topology.BROADCAST, 5, 3, 5L}, {"upscale", Topology.BROADCAST, 3, 5, 5L}, {"upscale", Topology.PIPELINE, 1, 2, 0L}, {"upscale", Topology.PIPELINE, 2, 3, 0L}, {"upscale", Topology.PIPELINE, 3, 7, 0L}, {"upscale", Topology.PIPELINE, 4, 8, 0L}, {"upscale", Topology.PIPELINE, 20, 21, 0L}, {"upscale", Topology.PIPELINE, 3, 5, 5L}, {"downscale", Topology.PIPELINE, 2, 1, 0L}, {"downscale", Topology.PIPELINE, 3, 2, 0L}, {"downscale", Topology.PIPELINE, 7, 3, 0L}, {"downscale", Topology.PIPELINE, 8, 4, 0L}, {"downscale", Topology.PIPELINE, 21, 20, 0L}, {"downscale", Topology.PIPELINE, 5, 3, 5L}, {"no scale", Topology.PIPELINE, 1, 1, 0L}, {"no scale", Topology.PIPELINE, 3, 3, 0L}, {"no scale", Topology.PIPELINE, 7, 7, 0L}, {"no scale", Topology.PIPELINE, 20, 20, 0L}, {"upscale", Topology.UNION, 1, 2, 0L}, {"upscale", Topology.UNION, 2, 3, 0L}, {"upscale", Topology.UNION, 3, 7, 0L}, {"upscale", Topology.UNION, 3, 5, 5L}, {"downscale", Topology.UNION, 2, 1, 0L}, {"downscale", Topology.UNION, 3, 2, 0L}, {"downscale", Topology.UNION, 7, 3, 0L}, {"downscale", Topology.UNION, 5, 3, 5L}, {"no scale", Topology.UNION, 1, 1, 0L}, {"no scale", Topology.UNION, 7, 7, 0L}, {"upscale", Topology.MULTI_INPUT, 1, 2, 0L}, {"upscale", Topology.MULTI_INPUT, 2, 3, 0L}, {"upscale", Topology.MULTI_INPUT, 3, 7, 0L}, {"upscale", Topology.MULTI_INPUT, 3, 5, 5L}, {"downscale", Topology.MULTI_INPUT, 2, 1, 0L}, {"downscale", Topology.MULTI_INPUT, 3, 2, 0L}, {"downscale", Topology.MULTI_INPUT, 7, 3, 0L}, {"downscale", Topology.MULTI_INPUT, 5, 3, 5L}, {"no scale", Topology.MULTI_INPUT, 1, 1, 0L}, {"no scale", Topology.MULTI_INPUT, 7, 7, 0L}};
        return (Object[][])Arrays.stream(parameters).map(params -> new Object[][]{ArrayUtils.insert((int)((Object[])params).length, (Object[])params, (Object[])new Object[0])}).flatMap(Arrays::stream).toArray(x$0 -> new Object[x$0][]);
    }

    public UnalignedCheckpointRescaleITCase(String desc, Topology topology, int oldParallelism, int newParallelism, long sourceSleepMs) {
        this.topology = topology;
        this.oldParallelism = oldParallelism;
        this.newParallelism = newParallelism;
        this.sourceSleepMs = sourceSleepMs;
    }

    @Test
    public void shouldRescaleUnalignedCheckpoint() throws Exception {
        UnalignedCheckpointTestBase.UnalignedSettings prescaleSettings = new UnalignedCheckpointTestBase.UnalignedSettings(this.topology).setParallelism(this.oldParallelism).setExpectedFailures(1).setSourceSleepMs(this.sourceSleepMs);
        prescaleSettings.setGenerateCheckpoint(true);
        File checkpointDir = super.execute(prescaleSettings);
        UnalignedCheckpointTestBase.UnalignedSettings postscaleSettings = new UnalignedCheckpointTestBase.UnalignedSettings(this.topology).setParallelism(this.newParallelism).setExpectedFailures(1);
        postscaleSettings.setRestoreCheckpoint(checkpointDir);
        super.execute(postscaleSettings);
    }

    @Override
    protected void checkCounters(JobExecutionResult result) {
        this.collector.checkThat("NUM_OUTPUTS = NUM_INPUTS", (Object)((Long)result.getAccumulatorResult("outputs")), Matchers.equalTo((Object)((Long)result.getAccumulatorResult("inputs_"))));
        if (!this.topology.equals(Topology.CUSTOM_PARTITIONER)) {
            this.collector.checkThat("NUM_DUPLICATES", (Object)((Long)result.getAccumulatorResult("duplicates")), Matchers.equalTo((Object)0L));
        }
    }

    private static class BackPressureInducingSink<T>
    implements SinkFunction<T> {
        private BackPressureInducingSink() {
        }

        public void invoke(T value, SinkFunction.Context ctx) throws Exception {
            Thread.sleep(1L);
        }
    }

    private static class StringPartitioner
    implements Partitioner<String> {
        private StringPartitioner() {
        }

        public int partition(String key, int numPartitions) {
            return Integer.parseInt(key) % numPartitions;
        }
    }

    private static class UnionLikeCoGroup
    implements CoMapFunction<Long, Long, Long> {
        private UnionLikeCoGroup() {
        }

        public Long map1(Long value) throws Exception {
            return UnalignedCheckpointTestBase.checkHeader(value);
        }

        public Long map2(Long value) throws Exception {
            return UnalignedCheckpointTestBase.checkHeader(value);
        }
    }

    private static class InputCountFunction
    extends RichMapFunction<Long, Long>
    implements CheckpointedFunction {
        private static final long serialVersionUID = -1098571965968341646L;
        private final LongCounter numInputCounter = new LongCounter();
        private ListState<Long> state;

        private InputCountFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.getRuntimeContext().addAccumulator("inputs_", (Accumulator)this.numInputCounter);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor descriptor = new ListStateDescriptor("num-inputs", Types.LONG);
            this.state = context.getOperatorStateStore().getListState(descriptor);
            for (Long numInputs : (Iterable)this.state.get()) {
                this.numInputCounter.add(numInputs);
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.state.update(Collections.singletonList(this.numInputCounter.getLocalValue()));
        }

        public Long map(Long value) throws Exception {
            this.numInputCounter.add(1L);
            return UnalignedCheckpointTestBase.checkHeader(value);
        }
    }

    private static class StatefulKeyedMap
    extends RichMapFunction<Long, Long> {
        private static final ValueStateDescriptor<Long> DESC = new ValueStateDescriptor("group", (TypeSerializer)LongSerializer.INSTANCE);
        ValueState<Long> state;

        private StatefulKeyedMap() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.state = this.getRuntimeContext().getState(DESC);
        }

        public Long map(Long value) throws Exception {
            Long lastGroup = (Long)this.state.value();
            long rawValue = UnalignedCheckpointTestBase.withoutHeader(value);
            long group = rawValue % 100L;
            if (lastGroup != null) {
                Preconditions.checkState((group == lastGroup ? 1 : 0) != 0, (Object)"Mismatched key group");
            } else {
                this.state.update((Object)group);
            }
            return value;
        }
    }

    protected static class VerifyingSink
    extends UnalignedCheckpointTestBase.VerifyingSinkBase<State> {
        private boolean firstDuplicate = true;

        protected VerifyingSink(long minCheckpoints, long checkpointingInterval) {
            super(minCheckpoints, checkpointingInterval);
        }

        @Override
        protected State createState() {
            return new State();
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            int intValue = (int)UnalignedCheckpointTestBase.withoutHeader(value);
            if (((State)this.state).encounteredNumbers.get(intValue)) {
                ++((State)this.state).numDuplicates;
                if (this.firstDuplicate) {
                    UnalignedCheckpointTestBase.LOG.info("Duplicate record {} @ {} subtask ({} attempt)", new Object[]{intValue, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber()});
                    this.firstDuplicate = false;
                }
            }
            ((State)this.state).encounteredNumbers.set(intValue);
            ++((State)this.state).numOutput;
            this.induceBackpressure();
        }

        @Override
        public void close() throws Exception {
            ((State)this.state).numLostValues = ((State)this.state).encounteredNumbers.length() - ((State)this.state).encounteredNumbers.cardinality();
            super.close();
        }

        static class State
        extends UnalignedCheckpointTestBase.VerifyingSinkStateBase {
            private final BitSet encounteredNumbers = new BitSet();

            State() {
            }
        }
    }

    static enum Topology implements UnalignedCheckpointTestBase.DagCreator
    {
        PIPELINE{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMillis) {
                int parallelism = env.getParallelism();
                DataStream<Long> source = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism, 0, sourceSleepMillis, (FilterFunction<Long>)(FilterFunction & Serializable)val -> true);
                this.addFailingSink(source, minCheckpoints, slotSharing);
            }
        }
        ,
        MULTI_INPUT{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                SingleOutputStreamOperator combinedSource = null;
                for (int inputIndex = 0; inputIndex < 3; ++inputIndex) {
                    int finalInputIndex = inputIndex;
                    SingleOutputStreamOperator source = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism, inputIndex, sourceSleepMs, (FilterFunction<Long>)(FilterFunction & Serializable)val -> UnalignedCheckpointTestBase.withoutHeader(val) % 3L == (long)finalInputIndex);
                    combinedSource = combinedSource == null ? source : combinedSource.connect(source).map((CoMapFunction)new UnionLikeCoGroup()).name("min" + inputIndex).uid("min" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "min" + inputIndex));
                }
                this.addFailingSink((DataStream<Long>)combinedSource, minCheckpoints, slotSharing);
            }
        }
        ,
        KEYED_DIFFERENT_PARALLELISM{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                DataStream<Long> source1 = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism / 2, 0, sourceSleepMs, (FilterFunction<Long>)(FilterFunction & Serializable)val -> UnalignedCheckpointTestBase.withoutHeader(val) % 2L == 0L);
                DataStream<Long> source2 = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism / 3, 1, sourceSleepMs, (FilterFunction<Long>)(FilterFunction & Serializable)val -> UnalignedCheckpointTestBase.withoutHeader(val) % 2L == 1L);
                KeySelector & Serializable keySelector = (KeySelector & Serializable)i -> UnalignedCheckpointTestBase.withoutHeader(i) % 100L;
                SingleOutputStreamOperator connected = source1.connect(source2).keyBy((KeySelector)keySelector, (KeySelector)keySelector).process((KeyedCoProcessFunction)new TestKeyedCoProcessFunction()).setParallelism(parallelism);
                this.addFailingSink((DataStream<Long>)connected, minCheckpoints, slotSharing);
            }
        }
        ,
        UNION{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                DataStream combinedSource = null;
                for (int inputIndex = 0; inputIndex < 3; ++inputIndex) {
                    int finalInputIndex = inputIndex;
                    DataStream source = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism, inputIndex, sourceSleepMs, (FilterFunction<Long>)(FilterFunction & Serializable)val -> UnalignedCheckpointTestBase.withoutHeader(val) % 3L == (long)finalInputIndex);
                    combinedSource = combinedSource == null ? source : combinedSource.union(new DataStream[]{source});
                }
                this.addFailingSink(combinedSource, minCheckpoints, slotSharing);
            }
        }
        ,
        BROADCAST{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                DataStreamSource broadcastSide = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, parallelism, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source");
                SingleOutputStreamOperator source = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism, 0, sourceSleepMs, (FilterFunction<Long>)(FilterFunction & Serializable)val -> true).map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("map").uid("map").slotSharingGroup(slotSharing ? "default" : "failing-map");
                MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
                BroadcastStream broadcast = broadcastSide.broadcast(new MapStateDescriptor[]{descriptor});
                SingleOutputStreamOperator joined = source.connect(broadcast).process((BroadcastProcessFunction)new TestBroadcastProcessFunction()).setParallelism(2 * parallelism);
                this.addFailingSink((DataStream<Long>)joined, minCheckpoints, slotSharing);
            }
        }
        ,
        KEYED_BROADCAST{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                DataStreamSource broadcastSide1 = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, 1, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source-1").setParallelism(1);
                DataStreamSource broadcastSide2 = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, 1, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source-2").setParallelism(1);
                DataStreamSource broadcastSide3 = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, 1, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source-3").setParallelism(1);
                SingleOutputStreamOperator source = this.createSourcePipeline(env, minCheckpoints, slotSharing, expectedRestarts, parallelism, 0, sourceSleepMs, (FilterFunction<Long>)(FilterFunction & Serializable)val -> true).map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("map").uid("map").slotSharingGroup(slotSharing ? "default" : "failing-map");
                MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
                DataStream broadcastSide = broadcastSide1.union(new DataStream[]{broadcastSide2}).union(new DataStream[]{broadcastSide3});
                BroadcastStream broadcast = broadcastSide.broadcast(new MapStateDescriptor[]{descriptor});
                SingleOutputStreamOperator joined = source.keyBy((KeySelector & Serializable)i -> UnalignedCheckpointTestBase.withoutHeader(i) % 100L).connect(broadcast).process((KeyedBroadcastProcessFunction)new TestKeyedBroadcastProcessFunction()).setParallelism(parallelism + 2);
                this.addFailingSink((DataStream<Long>)joined, minCheckpoints, slotSharing);
            }
        }
        ,
        CUSTOM_PARTITIONER{
            final int sinkParallelism = 3;

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, parallelism, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source").name("source").uid("source").map((MapFunction)new MapFunction<Long, String>(){

                    public String map(Long value) throws Exception {
                        value = UnalignedCheckpointTestBase.withoutHeader(value);
                        return this.buildString(value % 3L, value / 3L);
                    }
                }).name("long-to-string-map").uid("long-to-string-map").map(new UnalignedCheckpointTestBase.FailingMapper((FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> false, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> state.completedCheckpoints >= (long)(minCheckpoints / 2) && state.runNumber == 0L, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> false, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> false)).name("failing-map").uid("failing-map").setParallelism(parallelism).partitionCustom((Partitioner)new StringPartitioner(), (KeySelector & Serializable)str -> str.split(" ")[0]).addSink(new BackPressureInducingSink()).name("sink").uid("sink").setParallelism(3);
            }

            private String buildString(long partition, long index) {
                String longStr = new String(new char[3713]).replace('\u0000', '\uffff');
                return partition + " " + index + " " + longStr;
            }
        };


        void addFailingSink(DataStream<Long> combinedSource, long minCheckpoints, boolean slotSharing) {
            combinedSource.shuffle().map(new UnalignedCheckpointTestBase.FailingMapper((FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> false, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> state.completedCheckpoints >= minCheckpoints / 2L && state.runNumber == 0L, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> false, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> false)).name("failing-map").uid("failing-map").slotSharingGroup(slotSharing ? "default" : "failing-map").shuffle().addSink((SinkFunction)new VerifyingSink(minCheckpoints, combinedSource.getExecutionEnvironment().getCheckpointInterval())).setParallelism(1).name("sink").uid("sink").slotSharingGroup(slotSharing ? "default" : "sink");
        }

        DataStream<Long> createSourcePipeline(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, int parallelism, int inputIndex, long sourceSleepMs, FilterFunction<Long> sourceFilter) {
            return env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, parallelism, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source" + inputIndex).uid("source" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "source" + inputIndex)).filter(sourceFilter).name("input-filter" + inputIndex).uid("input-filter" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "source" + inputIndex)).map((MapFunction)new InputCountFunction()).name("input-counter" + inputIndex).uid("input-counter" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "source" + inputIndex)).global().map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("global" + inputIndex).uid("global" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "global" + inputIndex)).rebalance().map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).setParallelism(parallelism + 1).name("rebalance" + inputIndex).uid("rebalance" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "rebalance" + inputIndex)).shuffle().map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("upscale" + inputIndex).uid("upscale" + inputIndex).setParallelism(2 * parallelism).slotSharingGroup((String)(slotSharing ? "default" : "upscale" + inputIndex)).shuffle().map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("downscale" + inputIndex).uid("downscale" + inputIndex).setParallelism(parallelism + 1).slotSharingGroup((String)(slotSharing ? "default" : "downscale" + inputIndex)).keyBy((KeySelector & Serializable)i -> UnalignedCheckpointTestBase.withoutHeader(i) % 100L).map((MapFunction)new StatefulKeyedMap()).name("keyby" + inputIndex).uid("keyby" + inputIndex).slotSharingGroup((String)(slotSharing ? "default" : "keyby" + inputIndex)).rescale().map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("rescale" + inputIndex).uid("rescale" + inputIndex).setParallelism(Math.max(parallelism + 1, parallelism * 3 / 2)).slotSharingGroup((String)(slotSharing ? "default" : "rescale" + inputIndex));
        }

        public String toString() {
            return this.name().toLowerCase();
        }

        private static class TestKeyedBroadcastProcessFunction
        extends KeyedBroadcastProcessFunction<Long, Long, Long, Long> {
            private static final long serialVersionUID = 7852973507735751404L;

            TestKeyedBroadcastProcessFunction() {
            }

            public void processElement(Long value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<Long> out) {
                out.collect((Object)UnalignedCheckpointTestBase.checkHeader(value));
            }

            public void processBroadcastElement(Long value, KeyedBroadcastProcessFunction.Context ctx, Collector<Long> out) {
            }
        }

        private static class TestKeyedCoProcessFunction
        extends KeyedCoProcessFunction<Long, Long, Long, Long> {
            private static final long serialVersionUID = 1L;

            TestKeyedCoProcessFunction() {
            }

            public void processElement1(Long value, KeyedCoProcessFunction.Context ctx, Collector<Long> out) throws Exception {
                out.collect((Object)UnalignedCheckpointTestBase.checkHeader(value));
            }

            public void processElement2(Long value, KeyedCoProcessFunction.Context ctx, Collector<Long> out) throws Exception {
                out.collect((Object)UnalignedCheckpointTestBase.checkHeader(value));
            }
        }

        private static class TestBroadcastProcessFunction
        extends BroadcastProcessFunction<Long, Long, Long> {
            private static final long serialVersionUID = 7852973507735751404L;

            TestBroadcastProcessFunction() {
            }

            public void processElement(Long value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<Long> out) {
                out.collect((Object)UnalignedCheckpointTestBase.checkHeader(value));
            }

            public void processBroadcastElement(Long value, BroadcastProcessFunction.Context ctx, Collector<Long> out) {
            }
        }
    }
}

