/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.Random;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;

public class StreamingJobGraphGeneratorTest {
    @Test
    public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
        long seed = System.currentTimeMillis();
        Random r = new Random(seed);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamingJob = new StreamGraph(env);
        StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
        boolean closureCleanerEnabled = r.nextBoolean();
        boolean forceAvroEnabled = r.nextBoolean();
        boolean forceKryoEnabled = r.nextBoolean();
        boolean objectReuseEnabled = r.nextBoolean();
        boolean sysoutLoggingEnabled = r.nextBoolean();
        int dop = 1 + r.nextInt(10);
        ExecutionConfig config = streamingJob.getExecutionConfig();
        if (closureCleanerEnabled) {
            config.enableClosureCleaner();
        } else {
            config.disableClosureCleaner();
        }
        if (forceAvroEnabled) {
            config.enableForceAvro();
        } else {
            config.disableForceAvro();
        }
        if (forceKryoEnabled) {
            config.enableForceKryo();
        } else {
            config.disableForceKryo();
        }
        if (objectReuseEnabled) {
            config.enableObjectReuse();
        } else {
            config.disableObjectReuse();
        }
        if (sysoutLoggingEnabled) {
            config.enableSysoutLogging();
        } else {
            config.disableSysoutLogging();
        }
        config.setParallelism(dop);
        JobGraph jobGraph = compiler.createJobGraph();
        String EXEC_CONFIG_KEY = "runtime.config";
        InstantiationUtil.writeObjectToConfig((Object)jobGraph.getSerializedExecutionConfig(), (Configuration)jobGraph.getJobConfiguration(), (String)"runtime.config");
        SerializedValue serializedExecutionConfig = (SerializedValue)InstantiationUtil.readObjectFromConfig((Configuration)jobGraph.getJobConfiguration(), (String)"runtime.config", (ClassLoader)Thread.currentThread().getContextClassLoader());
        Assert.assertNotNull((Object)serializedExecutionConfig);
        ExecutionConfig executionConfig = (ExecutionConfig)serializedExecutionConfig.deserializeValue(this.getClass().getClassLoader());
        Assert.assertEquals((Object)closureCleanerEnabled, (Object)executionConfig.isClosureCleanerEnabled());
        Assert.assertEquals((Object)forceAvroEnabled, (Object)executionConfig.isForceAvroEnabled());
        Assert.assertEquals((Object)forceKryoEnabled, (Object)executionConfig.isForceKryoEnabled());
        Assert.assertEquals((Object)objectReuseEnabled, (Object)executionConfig.isObjectReuseEnabled());
        Assert.assertEquals((Object)sysoutLoggingEnabled, (Object)executionConfig.isSysoutLoggingEnabled());
        Assert.assertEquals((long)dop, (long)executionConfig.getParallelism());
    }

    @Test
    public void testParallelismOneNotChained() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy(new int[]{0}).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.addSink((SinkFunction)new SinkFunction<Tuple2<String, String>>(){

            public void invoke(Tuple2<String, String> value) {
            }
        });
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setJobName("test job");
        JobGraph jobGraph = streamGraph.getJobGraph();
        Assert.assertEquals((long)2L, (long)jobGraph.getNumberOfVertices());
        Assert.assertEquals((long)1L, (long)jobGraph.getVerticesAsArray()[0].getParallelism());
        Assert.assertEquals((long)1L, (long)jobGraph.getVerticesAsArray()[1].getParallelism());
    }

    @Test
    public void testDisabledCheckpointing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraph(env);
        Assert.assertFalse((String)"Checkpointing enabled", (boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled());
        StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
        JobGraph jobGraph = jobGraphGenerator.createJobGraph();
        JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
        Assert.assertEquals((long)Long.MAX_VALUE, (long)snapshottingSettings.getCheckpointInterval());
    }
}

