/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageFactory;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.streaming.util.StateBackendUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class CheckpointFailureManagerITCase
extends TestLogger {
    @ClassRule
    public static MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build());

    @Test
    public void testFinalizationFailureCounted() throws Exception {
        block2: {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(10L);
            CheckpointStorageUtils.configureCheckpointStorageWithFactory((StreamExecutionEnvironment)env, (String)"org.apache.flink.test.checkpointing.CheckpointFailureManagerITCase$FailingFinalizationCheckpointStorageFactory");
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
            RestartStrategyUtils.configureNoRestartStrategy((StreamExecutionEnvironment)env);
            env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).sinkTo((Sink)new DiscardingSink());
            JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
            try {
                TestUtils.submitJobAndWaitForResult((ClusterClient)cluster.getClusterClient(), (JobGraph)jobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
                Assert.fail((String)"The job should fail");
            }
            catch (JobExecutionException jobException) {
                if (this.isCheckpointFailure(jobException)) break block2;
                throw jobException;
            }
        }
    }

    @Test(timeout=20000L)
    public void testAsyncCheckpointFailureTriggerJobFailed() throws Exception {
        block2: {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(500L);
            RestartStrategyUtils.configureNoRestartStrategy((StreamExecutionEnvironment)env);
            StateBackendUtils.configureStateBackendWithFactory((StreamExecutionEnvironment)env, (String)"org.apache.flink.test.checkpointing.CheckpointFailureManagerITCase$AsyncFailureStateBackendFactory");
            env.addSource((SourceFunction)new StringGeneratingSourceFunction()).sinkTo((Sink)new DiscardingSink());
            JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
            try {
                TestUtils.submitJobAndWaitForResult((ClusterClient)cluster.getClusterClient(), (JobGraph)jobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
            }
            catch (JobExecutionException jobException) {
                if (this.isCheckpointFailure(jobException)) break block2;
                throw jobException;
            }
        }
        Assert.assertEquals((long)1L, (long)StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
    }

    private boolean isCheckpointFailure(JobExecutionException jobException) {
        return ExceptionUtils.findThrowable((Throwable)jobException, FlinkRuntimeException.class).filter(ex -> ex.getMessage().startsWith("Exceeded checkpoint tolerable failure threshold.")).isPresent();
    }

    private static class FailingFinalizationCheckpointStorage
    implements CheckpointStorage {
        private static final long serialVersionUID = 8134582566514272546L;

        private FailingFinalizationCheckpointStorage() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
            return new TestCompletedCheckpointStorageLocation();
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) {
            return new TestingCheckpointStorageAccessCoordinatorView(){

                public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) {
                    return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE){

                        public CheckpointMetadataOutputStream createMetadataOutputStream() {
                            throw new RuntimeException("finalization failure");
                        }
                    };
                }
            };
        }
    }

    public static class FailingFinalizationCheckpointStorageFactory
    implements CheckpointStorageFactory<FailingFinalizationCheckpointStorage> {
        public FailingFinalizationCheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            return new FailingFinalizationCheckpointStorage();
        }
    }

    private static class AsyncFailureStateBackend
    extends HashMapStateBackend {
        private static final long serialVersionUID = 1L;
        private static final SnapshotStrategy<OperatorStateHandle, SnapshotResources> ASYNC_DECLINING_SNAPSHOT_STRATEGY = new SnapshotStrategy<OperatorStateHandle, SnapshotResources>(){

            public SnapshotResources syncPrepareResources(long checkpointId) throws Exception {
                return null;
            }

            public SnapshotStrategy.SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(SnapshotResources syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
                return closeableRegistry -> {
                    throw new Exception("Expected async snapshot exception.");
                };
            }
        };

        private AsyncFailureStateBackend() {
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) {
            return new DefaultOperatorStateBackendBuilder(parameters.getEnv().getUserCodeClassLoader().asClassLoader(), parameters.getEnv().getExecutionConfig(), true, parameters.getStateHandles(), parameters.getCancelStreamRegistry()){

                public DefaultOperatorStateBackend build() {
                    CloseableRegistry closeableRegistry = new CloseableRegistry();
                    return new DefaultOperatorStateBackend(this.executionConfig, closeableRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), new SnapshotStrategyRunner("Async Failure State Backend", ASYNC_DECLINING_SNAPSHOT_STRATEGY, closeableRegistry, SnapshotExecutionType.ASYNCHRONOUS));
                }
            }.build();
        }

        public AsyncFailureStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
            return this;
        }
    }

    public static class AsyncFailureStateBackendFactory
    implements StateBackendFactory<AsyncFailureStateBackend> {
        public AsyncFailureStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            return new AsyncFailureStateBackend();
        }
    }

    private static class StringGeneratingSourceFunction
    extends RichParallelSourceFunction<String>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        private static final ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor("emitted", Long.class);
        private final byte[] randomBytes = new byte[10];
        private ListState<Long> listState;
        private long emitted = 0L;
        private volatile boolean isRunning = true;
        public static final AtomicInteger INITIALIZE_TIMES = new AtomicInteger(0);

        private StringGeneratingSourceFunction() {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.listState.update(Collections.singletonList(this.emitted));
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.listState = context.getOperatorStateStore().getListState(stateDescriptor);
            INITIALIZE_TIMES.addAndGet(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.isRunning) {
                ThreadLocalRandom.current().nextBytes(this.randomBytes);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)new String(this.randomBytes));
                    ++this.emitted;
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

