/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class SnapshotFileMergingCompatibilityITCase
extends TestLogger {
    private static final long DELETE_TIMEOUT_MILLS = 120000L;

    public static Collection<Object[]> parameters() {
        return Arrays.asList({RestoreMode.CLAIM, true}, {RestoreMode.CLAIM, false}, {RestoreMode.NO_CLAIM, true}, {RestoreMode.NO_CLAIM, false});
    }

    @ParameterizedTest(name="RestoreMode = {0}, fileMergingAcrossBoundary = {1}")
    @MethodSource(value={"parameters"})
    public void testSwitchFromDisablingToEnablingFileMerging(RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir java.nio.file.Path checkpointDir) throws Exception {
        this.testSwitchingFileMerging(checkpointDir, false, true, restoreMode, fileMergingAcrossBoundary);
    }

    @ParameterizedTest(name="RestoreMode = {0}, fileMergingAcrossBoundary = {1}")
    @MethodSource(value={"parameters"})
    public void testSwitchFromEnablingToDisablingFileMerging(RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir java.nio.file.Path checkpointDir) throws Exception {
        this.testSwitchingFileMerging(checkpointDir, true, false, restoreMode, fileMergingAcrossBoundary);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSwitchingFileMerging(java.nio.file.Path checkpointDir, boolean firstFileMergingSwitch, boolean secondFileMergingSwitch, RestoreMode restoreMode, boolean fileMergingAcrossBoundary) throws Exception {
        CheckpointMetadata thirdMetadata;
        String thirdCheckpoint;
        CheckpointMetadata secondMetadata;
        String secondCheckpoint;
        CheckpointMetadata firstMetadata;
        String firstCheckpoint;
        Configuration config = new Configuration();
        int consecutiveCheckpoint = 4;
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir.toUri().toString());
        config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, (Object)true);
        config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, (Object)fileMergingAcrossBoundary);
        config.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)firstFileMergingSwitch);
        MiniClusterWithClientResource firstCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        EmbeddedRocksDBStateBackend stateBackend1 = new EmbeddedRocksDBStateBackend();
        stateBackend1.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        firstCluster.before();
        try {
            firstCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend1, null, firstCluster, restoreMode, config, 4, true);
            Assertions.assertThat((String)firstCheckpoint).isNotNull();
            firstMetadata = TestUtils.loadCheckpointMetadata((String)firstCheckpoint);
            this.verifyStateHandleType(firstMetadata, firstFileMergingSwitch);
        }
        finally {
            firstCluster.after();
        }
        config.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)secondFileMergingSwitch);
        EmbeddedRocksDBStateBackend stateBackend2 = new EmbeddedRocksDBStateBackend();
        stateBackend2.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        MiniClusterWithClientResource secondCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        secondCluster.before();
        try {
            secondCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend2, firstCheckpoint, secondCluster, restoreMode, config, 4, true);
            Assertions.assertThat((String)secondCheckpoint).isNotNull();
            secondMetadata = TestUtils.loadCheckpointMetadata((String)secondCheckpoint);
            this.verifyStateHandleType(secondMetadata, secondFileMergingSwitch);
            this.verifyCheckpointExistOrWaitDeleted(firstCheckpoint, SnapshotFileMergingCompatibilityITCase.determineFileExist(restoreMode, firstFileMergingSwitch, secondFileMergingSwitch), firstFileMergingSwitch, firstMetadata);
        }
        finally {
            secondCluster.after();
        }
        EmbeddedRocksDBStateBackend stateBackend3 = new EmbeddedRocksDBStateBackend();
        stateBackend3.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        MiniClusterWithClientResource thirdCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
        thirdCluster.before();
        try {
            thirdCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend3, secondCheckpoint, thirdCluster, restoreMode, config, 4, true);
            Assertions.assertThat((String)thirdCheckpoint).isNotNull();
            thirdMetadata = TestUtils.loadCheckpointMetadata((String)thirdCheckpoint);
            this.verifyStateHandleType(thirdMetadata, secondFileMergingSwitch);
            this.verifyCheckpointExistOrWaitDeleted(secondCheckpoint, SnapshotFileMergingCompatibilityITCase.determineFileExist(restoreMode, secondFileMergingSwitch, secondFileMergingSwitch), secondFileMergingSwitch, secondMetadata);
        }
        finally {
            thirdCluster.after();
        }
        EmbeddedRocksDBStateBackend stateBackend4 = new EmbeddedRocksDBStateBackend();
        stateBackend4.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        MiniClusterWithClientResource fourthCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
        fourthCluster.before();
        try {
            String fourthCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend4, thirdCheckpoint, fourthCluster, restoreMode, config, 4, false);
            Assertions.assertThat((String)fourthCheckpoint).isNotNull();
            this.verifyCheckpointExistOrWaitDeleted(thirdCheckpoint, SnapshotFileMergingCompatibilityITCase.determineFileExist(restoreMode, secondFileMergingSwitch, secondFileMergingSwitch), secondFileMergingSwitch, thirdMetadata);
            this.verifyCheckpointExistOrWaitDeleted(fourthCheckpoint, TernaryBoolean.FALSE, secondFileMergingSwitch, null);
        }
        finally {
            fourthCluster.after();
        }
    }

    private void verifyStateHandleType(CheckpointMetadata metadata, boolean fileMergingEnabled) throws IOException {
        boolean hasKeyedState = false;
        for (OperatorState operatorState : metadata.getOperatorStates()) {
            for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                ArrayList keyedStateHandles = new ArrayList(subtaskState.getManagedKeyedState());
                for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                    Assertions.assertThat((Object)keyedStateHandle).isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
                    ((IncrementalRemoteKeyedStateHandle)keyedStateHandle).streamSubHandles().forEach(handle -> {
                        if (fileMergingEnabled) {
                            Assertions.assertThat((Object)handle).isInstanceOf(SegmentFileStateHandle.class);
                        } else {
                            Assertions.assertThat((Object)handle).isNotInstanceOf(SegmentFileStateHandle.class);
                        }
                    });
                    hasKeyedState = true;
                }
            }
        }
        Assertions.assertThat((boolean)hasKeyedState).isTrue();
    }

    private static TernaryBoolean determineFileExist(RestoreMode mode, boolean lastFileMergingEnabled, boolean thisFileMergingEnabled) {
        if (mode == RestoreMode.CLAIM) {
            if (lastFileMergingEnabled || thisFileMergingEnabled) {
                return TernaryBoolean.FALSE;
            }
            return TernaryBoolean.UNDEFINED;
        }
        return TernaryBoolean.TRUE;
    }

    private void verifyCheckpointExistOrWaitDeleted(String checkpointPath, TernaryBoolean exist, boolean fileMergingEnabled, CheckpointMetadata metadata) throws Exception {
        Path checkpointDir = new Path(checkpointPath);
        FileSystem fs = checkpointDir.getFileSystem();
        Path baseDir = checkpointDir.getParent();
        Path sharedFile = new Path(baseDir, "shared");
        Path taskOwnedFile = new Path(baseDir, "taskowned");
        Assertions.assertThat((boolean)fs.exists(baseDir)).isTrue();
        Assertions.assertThat((boolean)fs.exists(sharedFile)).isTrue();
        Assertions.assertThat((boolean)fs.exists(taskOwnedFile)).isTrue();
        if (exist.equals((Object)TernaryBoolean.TRUE)) {
            Assertions.assertThat((boolean)fs.exists(checkpointDir)).isTrue();
            Assertions.assertThat((metadata == null || this.verifyCheckpointExist(metadata, true) ? 1 : 0) != 0).isTrue();
            Assertions.assertThat((fs.listStatus(taskOwnedFile) != null && fs.listStatus(taskOwnedFile).length > 0 ? 1 : 0) != 0).isEqualTo(fileMergingEnabled);
        } else if (exist.equals((Object)TernaryBoolean.FALSE)) {
            long waited = 0L;
            boolean fileExist = true;
            while (fileExist) {
                try {
                    fileExist = fs.exists(checkpointDir) || metadata != null && !this.verifyCheckpointExist(metadata, false) || !SnapshotFileMergingCompatibilityITCase.verifyCheckpointNoDirectory(fs, sharedFile, taskOwnedFile);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                if (!fileExist) continue;
                Thread.sleep(500L);
                if ((waited += 500L) < 120000L) continue;
                Assertions.assertThat((boolean)fs.exists(checkpointDir)).isFalse();
                Assertions.assertThat((Object[])fs.listStatus(sharedFile)).isNullOrEmpty();
                Assertions.assertThat((Object[])fs.listStatus(taskOwnedFile)).isNullOrEmpty();
            }
        }
    }

    private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean exist) {
        AtomicBoolean result = new AtomicBoolean(true);
        block2: for (OperatorState operatorState : metadata.getOperatorStates()) {
            for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                ArrayList keyedStateHandles = new ArrayList(subtaskState.getManagedKeyedState());
                for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                    Assertions.assertThat((Object)keyedStateHandle).isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
                    boolean singleResult = ((IncrementalRemoteKeyedStateHandle)keyedStateHandle).streamSubHandles().allMatch(handle -> {
                        try {
                            if (handle instanceof FileStateHandle) {
                                Path p = ((FileStateHandle)handle).getFilePath();
                                return exist == p.getFileSystem().exists(p);
                            }
                            if (handle instanceof SegmentFileStateHandle) {
                                Path p = ((SegmentFileStateHandle)handle).getFilePath();
                                return exist == p.getFileSystem().exists(p);
                            }
                        }
                        catch (IOException e) {
                            this.log.warn("An error occurred when trying to check the file existence.", (Throwable)e);
                            return false;
                        }
                        return true;
                    });
                    result.set(result.get() || singleResult);
                    if (result.get()) continue;
                    break;
                }
                if (!result.get()) continue block2;
                ArrayList operatorStateHandles = new ArrayList(subtaskState.getManagedOperatorState());
                for (OperatorStateHandle handle2 : operatorStateHandles) {
                    if (!(handle2 instanceof FileMergingOperatorStreamStateHandle)) continue;
                    try {
                        Path p = ((FileMergingOperatorStreamStateHandle)handle2).getSharedDirHandle().getDirectory();
                        if (exist != p.getFileSystem().exists(p)) {
                            result.set(false);
                        }
                        if (exist == (p = ((FileMergingOperatorStreamStateHandle)handle2).getTaskOwnedDirHandle().getDirectory()).getFileSystem().exists(p)) continue;
                        result.set(false);
                    }
                    catch (IOException e) {
                        this.log.warn("An error occurred when trying to check the file existence.", (Throwable)e);
                        result.set(false);
                    }
                }
                if (result.get()) continue;
                continue block2;
            }
        }
        return result.get();
    }

    private static boolean verifyCheckpointNoDirectory(FileSystem fs, Path sharedFile, Path taskOwnedFile) throws IOException {
        FileStatus[] fileStatuses = fs.listStatus(sharedFile);
        if (fileStatuses != null) {
            for (FileStatus fileStatus : fileStatuses) {
                if (!fileStatus.isDir()) continue;
                return false;
            }
        }
        if ((fileStatuses = fs.listStatus(taskOwnedFile)) != null) {
            for (FileStatus fileStatus : fileStatuses) {
                if (!fileStatus.isDir()) continue;
                return false;
            }
        }
        return true;
    }
}

