/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class CheckpointExceptionHandlerConfigurationTest
extends TestLogger {
    @Test
    public void testConfigurationFailOnException() throws Exception {
        this.testConfigForwarding(true);
    }

    @Test
    public void testConfigurationDeclineOnException() throws Exception {
        this.testConfigForwarding(false);
    }

    @Test
    public void testFailIsDefaultConfig() {
        ExecutionConfig newExecutionConfig = new ExecutionConfig();
        Assert.assertTrue((boolean)newExecutionConfig.isFailTaskOnCheckpointError());
    }

    private void testConfigForwarding(boolean failOnException) throws Exception {
        final boolean expectedHandlerFlag = failOnException;
        DummyEnvironment environment = new DummyEnvironment("test", 1, 0);
        environment.setTaskStateManager((TaskStateManager)new TestTaskStateManager());
        environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
        final CheckpointExceptionHandlerFactory inspectingFactory = new CheckpointExceptionHandlerFactory(){

            public CheckpointExceptionHandler createCheckpointExceptionHandler(boolean failTaskOnCheckpointException, Environment environment) {
                Assert.assertEquals((Object)expectedHandlerFlag, (Object)failTaskOnCheckpointException);
                return super.createCheckpointExceptionHandler(failTaskOnCheckpointException, environment);
            }
        };
        StreamTask streamTask = new StreamTask((Environment)environment, null){

            protected void init() throws Exception {
            }

            protected void run() throws Exception {
            }

            protected void cleanup() throws Exception {
            }

            protected void cancelTask() throws Exception {
            }

            protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
                return inspectingFactory;
            }
        };
        streamTask.invoke();
    }

    @Test
    public void testCheckpointConfigDefault() throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Assert.assertTrue((boolean)streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
    }

    @Test
    public void testPropagationFailFromCheckpointConfig() throws Exception {
        this.doTestPropagationFromCheckpointConfig(true);
    }

    @Test
    public void testPropagationDeclineFromCheckpointConfig() throws Exception {
        this.doTestPropagationFromCheckpointConfig(false);
    }

    public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000L);
        streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
        streamExecutionEnvironment.addSource((SourceFunction)new SourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        }).addSink((SinkFunction)new DiscardingSink());
        StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        SerializedValue serializedExecutionConfig = jobGraph.getSerializedExecutionConfig();
        ExecutionConfig executionConfig = (ExecutionConfig)serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
        Assert.assertEquals((Object)failTaskOnCheckpointErrors, (Object)executionConfig.isFailTaskOnCheckpointError());
    }
}

