/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageFactory;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.streaming.util.StateBackendUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class UnalignedCheckpointFailureHandlingITCase {
    private static final int PARALLELISM = 2;
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(1).build());

    @Test
    public void testCheckpointSuccessAfterFailure() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TestCheckpointStorageFactory.failOnCloseRef = this.sharedObjects.add((Object)new AtomicBoolean(true));
        TestCheckpointStorageFactory.tempFolderRef = this.sharedObjects.add((Object)this.temporaryFolder);
        this.configure(env, "org.apache.flink.test.checkpointing.UnalignedCheckpointFailureHandlingITCase$TestCheckpointStorageFactory");
        this.buildGraph(env);
        MockStateBackend stateBackend = new MockStateBackend(MockKeyedStateBackend.MockSnapshotSupplier.EMPTY);
        JobClient jobClient = StateBackendUtils.configureStateBackendAndExecuteAsync((StreamExecutionEnvironment)env, (StateBackend)stateBackend);
        JobID jobID = jobClient.getJobID();
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.RUNNING));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobID, (boolean)false);
        this.triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
        miniCluster.triggerCheckpoint(jobID).get();
    }

    private void configure(StreamExecutionEnvironment env, String storageFactory) {
        env.enableCheckpointing(Long.MAX_VALUE, CheckpointingMode.EXACTLY_ONCE);
        CheckpointStorageUtils.configureCheckpointStorageWithFactory((StreamExecutionEnvironment)env, (String)storageFactory);
        env.getCheckpointConfig().enableUnalignedCheckpoints();
        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        env.setParallelism(2);
        env.disableOperatorChaining();
    }

    private void buildGraph(StreamExecutionEnvironment env) {
        env.fromSource((Source)new NumberSequenceSource(0L, Long.MAX_VALUE), WatermarkStrategy.noWatermarks(), "num-source").keyBy((KeySelector & Serializable)value -> value).map((MapFunction & Serializable)value -> {
            Thread.sleep(1L);
            return value;
        }).sinkTo((Sink)new DiscardingSink());
    }

    private void triggerFailingCheckpoint(JobID jobID, Class<TestException> expectedException, MiniCluster miniCluster) throws InterruptedException, ExecutionException {
        while (true) {
            Optional cpFailure;
            if (!(cpFailure = (Optional)((CompletableFuture)((CompletableFuture)miniCluster.triggerCheckpoint(jobID).thenApply(ign -> Optional.empty())).handle((ign, err) -> Optional.ofNullable(err))).get()).isPresent()) {
                Thread.sleep(50L);
                continue;
            }
            if (this.isCausedBy((Throwable)cpFailure.get(), expectedException)) {
                return;
            }
            ExceptionUtils.rethrow((Throwable)((Throwable)cpFailure.get()));
        }
    }

    private boolean isCausedBy(Throwable t, Class<TestException> expectedException) {
        return ExceptionUtils.findThrowable((Throwable)t, SerializedThrowable.class).flatMap(st -> {
            Throwable deser = st.deserializeError(this.getClass().getClassLoader());
            return ExceptionUtils.findThrowable((Throwable)deser, (Class)expectedException);
        }).isPresent();
    }

    private static class TestException
    extends IOException {
        public TestException(String message) {
            super(message);
        }
    }

    private static class FailingOnceFsCheckpointOutputStream
    extends FsCheckpointStreamFactory.FsCheckpointStateOutputStream {
        private final AtomicBoolean failOnClose;
        private volatile boolean failedCloseAndGetHandle = false;

        public FailingOnceFsCheckpointOutputStream(File path, int bufferSize, int localStateThreshold, AtomicBoolean failOnClose) throws IOException {
            super(Path.fromLocalFile((File)path.getAbsoluteFile()), FileSystem.get((URI)path.toURI()), bufferSize, localStateThreshold);
            this.failOnClose = failOnClose;
        }

        public StreamStateHandle closeAndGetHandle() throws IOException {
            if (this.failOnClose.get()) {
                this.failedCloseAndGetHandle = true;
                throw new TestException("failure from closeAndGetHandle");
            }
            return super.closeAndGetHandle();
        }

        public void close() {
            if (this.failedCloseAndGetHandle && this.failOnClose.compareAndSet(true, false)) {
                ExceptionUtils.rethrow((Throwable)new TestException("failure from close"));
            } else {
                super.close();
            }
        }
    }

    private static class TestCheckpointStorageAccess
    implements CheckpointStorageAccess {
        private final CheckpointStorageAccess delegate;
        private final AtomicBoolean failOnClose;
        private final File path;

        public TestCheckpointStorageAccess(CheckpointStorageAccess delegate, AtomicBoolean failOnClose, File file) {
            this.delegate = delegate;
            this.failOnClose = failOnClose;
            this.path = file;
        }

        public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
            return new CheckpointStreamFactory(){

                public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
                    return new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose);
                }

                public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
                    return false;
                }

                public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
                    throw new UnsupportedEncodingException();
                }
            };
        }

        public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
            return this.delegate.createTaskOwnedStateStream();
        }

        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
            return this.delegate.createTaskOwnedCheckpointStateToolset();
        }

        public boolean supportsHighlyAvailableStorage() {
            return this.delegate.supportsHighlyAvailableStorage();
        }

        public boolean hasDefaultSavepointLocation() {
            return this.delegate.hasDefaultSavepointLocation();
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
            return this.delegate.resolveCheckpoint(externalPointer);
        }

        public void initializeBaseLocationsForCheckpoint() throws IOException {
            this.delegate.initializeBaseLocationsForCheckpoint();
        }

        public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
            return this.delegate.initializeLocationForCheckpoint(checkpointId);
        }

        public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException {
            return this.delegate.initializeLocationForSavepoint(checkpointId, externalLocationPointer);
        }
    }

    private static class TestCheckpointStorage
    implements CheckpointStorage {
        private final CheckpointStorage delegate;
        private final SharedReference<AtomicBoolean> failOnCloseRef;
        private final SharedReference<TemporaryFolder> tempFolderRef;

        private TestCheckpointStorage(CheckpointStorage delegate, SharedReference<AtomicBoolean> failOnCloseRef, SharedReference<TemporaryFolder> tempFolderRef) {
            this.delegate = delegate;
            this.failOnCloseRef = failOnCloseRef;
            this.tempFolderRef = tempFolderRef;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return new TestCheckpointStorageAccess(this.delegate.createCheckpointStorage(jobId), (AtomicBoolean)this.failOnCloseRef.get(), ((TemporaryFolder)this.tempFolderRef.get()).newFolder());
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
            return this.delegate.resolveCheckpoint(externalPointer);
        }
    }

    public static class TestCheckpointStorageFactory
    implements CheckpointStorageFactory {
        private static SharedReference<AtomicBoolean> failOnCloseRef;
        private static SharedReference<TemporaryFolder> tempFolderRef;

        public CheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            return new TestCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage(), failOnCloseRef, tempFolderRef);
        }
    }
}

