package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.class */
public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
    @Test
    public void testConfigurationFailOnException() throws Exception {
        testConfigForwarding(true);
    }

    @Test
    public void testConfigurationDeclineOnException() throws Exception {
        testConfigForwarding(false);
    }

    @Test
    public void testFailIsDefaultConfig() {
        Assert.assertTrue(new ExecutionConfig().isFailTaskOnCheckpointError());
    }

    private void testConfigForwarding(final boolean z) throws Exception {
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setTaskStateManager(new TestTaskStateManager());
        dummyEnvironment.getExecutionConfig().setFailTaskOnCheckpointError(z);
        final CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory() { // from class: org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerConfigurationTest.1
            public CheckpointExceptionHandler createCheckpointExceptionHandler(boolean z2, Environment environment) {
                Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(z2));
                return super.createCheckpointExceptionHandler(z2, environment);
            }
        };
        new StreamTask(dummyEnvironment, null) { // from class: org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerConfigurationTest.2
            protected void init() throws Exception {
            }

            protected void run() throws Exception {
            }

            protected void cleanup() throws Exception {
            }

            protected void cancelTask() throws Exception {
            }

            protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
                return checkpointExceptionHandlerFactory;
            }
        }.invoke();
    }

    @Test
    public void testCheckpointConfigDefault() throws Exception {
        Assert.assertTrue(StreamExecutionEnvironment.getExecutionEnvironment().getCheckpointConfig().isFailOnCheckpointingErrors());
    }

    @Test
    public void testPropagationFailFromCheckpointConfig() throws Exception {
        doTestPropagationFromCheckpointConfig(true);
    }

    @Test
    public void testPropagationDeclineFromCheckpointConfig() throws Exception {
        doTestPropagationFromCheckpointConfig(false);
    }

    public void doTestPropagationFromCheckpointConfig(boolean z) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(1000L);
        executionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(z);
        executionEnvironment.addSource(new SourceFunction<Integer>() { // from class: org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerConfigurationTest.3
            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            }

            public void cancel() {
            }
        }).addSink(new DiscardingSink());
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(((ExecutionConfig) StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getSerializedExecutionConfig().deserializeValue(Thread.currentThread().getContextClassLoader())).isFailTaskOnCheckpointError()));
    }
}
