/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.StateBootstrapTransformation;
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

public class StateBootstrapTransformationTest
extends AbstractTestBase {
    @Test
    public void testBroadcastStateTransformationParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{0});
        StateBootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataStream)input).transform((BroadcastStateBootstrapFunction)new ExampleBroadcastStateBootstrapFunction());
        int maxParallelism = transformation.getMaxParallelism(4);
        SingleOutputStreamOperator result = transformation.writeOperatorSubtaskStates(OperatorIDGenerator.fromUid((String)"uid"), (StateBackend)new HashMapStateBackend(), new Path(), maxParallelism);
        Assert.assertEquals((String)"Broadcast transformations should always be run at parallelism 1", (long)1L, (long)result.getParallelism());
    }

    @Test
    public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{0});
        StateBootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataStream)input).transform((StateBootstrapFunction)new ExampleStateBootstrapFunction());
        int maxParallelism = transformation.getMaxParallelism(10);
        SingleOutputStreamOperator result = transformation.writeOperatorSubtaskStates(OperatorIDGenerator.fromUid((String)"uid"), (StateBackend)new HashMapStateBackend(), new Path(), maxParallelism);
        Assert.assertEquals((String)"The parallelism of a data set should not change when less than the max parallelism of the savepoint", (long)env.getParallelism(), (long)result.getParallelism());
    }

    @Test
    public void testMaxParallelismRespected() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{0});
        StateBootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataStream)input).transform((StateBootstrapFunction)new ExampleStateBootstrapFunction());
        int maxParallelism = transformation.getMaxParallelism(4);
        SingleOutputStreamOperator result = transformation.writeOperatorSubtaskStates(OperatorIDGenerator.fromUid((String)"uid"), (StateBackend)new HashMapStateBackend(), new Path(), maxParallelism);
        Assert.assertEquals((String)"The parallelism of a data set should be constrained my the savepoint max parallelism", (long)4L, (long)result.getParallelism());
    }

    @Test
    public void testOperatorSpecificMaxParallelismRespected() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{0});
        StateBootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataStream)input).setMaxParallelism(1).transform((StateBootstrapFunction)new ExampleStateBootstrapFunction());
        int maxParallelism = transformation.getMaxParallelism(4);
        SingleOutputStreamOperator result = transformation.writeOperatorSubtaskStates(OperatorIDGenerator.fromUid((String)"uid"), (StateBackend)new HashMapStateBackend(), new Path(), maxParallelism);
        Assert.assertEquals((String)"The parallelism of a data set should be constrained my the savepoint max parallelism", (long)1L, (long)result.getParallelism());
    }

    @Test
    public void testStreamConfig() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromElements((Object[])new String[]{""});
        StateBootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataStream)input).keyBy((KeySelector)new CustomKeySelector()).transform((KeyedStateBootstrapFunction)new ExampleKeyedStateBootstrapFunction());
        StreamConfig config = transformation.getConfig(OperatorIDGenerator.fromUid((String)"uid"), (StateBackend)new HashMapStateBackend(), new Configuration(), null);
        KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader());
        Assert.assertEquals((String)"Incorrect key selector forwarded to stream operator", CustomKeySelector.class, selector.getClass());
    }

    private static class ExampleKeyedStateBootstrapFunction
    extends KeyedStateBootstrapFunction<String, String> {
        private ExampleKeyedStateBootstrapFunction() {
        }

        public void processElement(String value, KeyedStateBootstrapFunction.Context ctx) throws Exception {
        }
    }

    private static class ExampleStateBootstrapFunction
    extends StateBootstrapFunction<Integer> {
        private ExampleStateBootstrapFunction() {
        }

        public void processElement(Integer value, StateBootstrapFunction.Context ctx) throws Exception {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }
    }

    private static class ExampleBroadcastStateBootstrapFunction
    extends BroadcastStateBootstrapFunction<Integer> {
        private ExampleBroadcastStateBootstrapFunction() {
        }

        public void processElement(Integer value, BroadcastStateBootstrapFunction.Context ctx) throws Exception {
        }
    }

    private static class CustomKeySelector
    implements KeySelector<String, String> {
        private CustomKeySelector() {
        }

        public String getKey(String value) throws Exception {
            return value;
        }
    }
}

