package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.class */
public class SnapshotFileMergingCompatibilityITCase extends TestLogger {
    private static final long DELETE_TIMEOUT_MILLS = 120000;

    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{RestoreMode.CLAIM, true}, new Object[]{RestoreMode.CLAIM, false}, new Object[]{RestoreMode.NO_CLAIM, true}, new Object[]{RestoreMode.NO_CLAIM, false});
    }

    @MethodSource({"parameters"})
    @ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = {1}")
    public void testSwitchFromDisablingToEnablingFileMerging(RestoreMode restoreMode, boolean z, @TempDir Path path) throws Exception {
        testSwitchingFileMerging(path, false, true, restoreMode, z);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = {1}")
    public void testSwitchFromEnablingToDisablingFileMerging(RestoreMode restoreMode, boolean z, @TempDir Path path) throws Exception {
        testSwitchingFileMerging(path, true, false, restoreMode, z);
    }

    private void testSwitchingFileMerging(Path path, boolean z, boolean z2, RestoreMode restoreMode, boolean z3) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path.toUri().toString());
        configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
        configuration.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, Boolean.valueOf(z3));
        configuration.set(CheckpointingOptions.FILE_MERGING_ENABLED, Boolean.valueOf(z));
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        embeddedRocksDBStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader());
        miniClusterWithClientResource.before();
        try {
            String runJobAndGetExternalizedCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(embeddedRocksDBStateBackend, null, miniClusterWithClientResource, restoreMode, configuration, 4, true);
            Assertions.assertThat(runJobAndGetExternalizedCheckpoint).isNotNull();
            CheckpointMetadata loadCheckpointMetadata = TestUtils.loadCheckpointMetadata(runJobAndGetExternalizedCheckpoint);
            verifyStateHandleType(loadCheckpointMetadata, z);
            miniClusterWithClientResource.after();
            configuration.set(CheckpointingOptions.FILE_MERGING_ENABLED, Boolean.valueOf(z2));
            EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend2 = new EmbeddedRocksDBStateBackend();
            embeddedRocksDBStateBackend2.configure(configuration, Thread.currentThread().getContextClassLoader());
            MiniClusterWithClientResource miniClusterWithClientResource2 = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
            miniClusterWithClientResource2.before();
            try {
                String runJobAndGetExternalizedCheckpoint2 = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(embeddedRocksDBStateBackend2, runJobAndGetExternalizedCheckpoint, miniClusterWithClientResource2, restoreMode, configuration, 4, true);
                Assertions.assertThat(runJobAndGetExternalizedCheckpoint2).isNotNull();
                CheckpointMetadata loadCheckpointMetadata2 = TestUtils.loadCheckpointMetadata(runJobAndGetExternalizedCheckpoint2);
                verifyStateHandleType(loadCheckpointMetadata2, z2);
                verifyCheckpointExistOrWaitDeleted(runJobAndGetExternalizedCheckpoint, determineFileExist(restoreMode, z, z2), z, loadCheckpointMetadata);
                miniClusterWithClientResource2.after();
                EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend3 = new EmbeddedRocksDBStateBackend();
                embeddedRocksDBStateBackend3.configure(configuration, Thread.currentThread().getContextClassLoader());
                MiniClusterWithClientResource miniClusterWithClientResource3 = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
                miniClusterWithClientResource3.before();
                try {
                    String runJobAndGetExternalizedCheckpoint3 = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(embeddedRocksDBStateBackend3, runJobAndGetExternalizedCheckpoint2, miniClusterWithClientResource3, restoreMode, configuration, 4, true);
                    Assertions.assertThat(runJobAndGetExternalizedCheckpoint3).isNotNull();
                    CheckpointMetadata loadCheckpointMetadata3 = TestUtils.loadCheckpointMetadata(runJobAndGetExternalizedCheckpoint3);
                    verifyStateHandleType(loadCheckpointMetadata3, z2);
                    verifyCheckpointExistOrWaitDeleted(runJobAndGetExternalizedCheckpoint2, determineFileExist(restoreMode, z2, z2), z2, loadCheckpointMetadata2);
                    miniClusterWithClientResource3.after();
                    EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend4 = new EmbeddedRocksDBStateBackend();
                    embeddedRocksDBStateBackend4.configure(configuration, Thread.currentThread().getContextClassLoader());
                    miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
                    miniClusterWithClientResource.before();
                    try {
                        String runJobAndGetExternalizedCheckpoint4 = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(embeddedRocksDBStateBackend4, runJobAndGetExternalizedCheckpoint3, miniClusterWithClientResource, restoreMode, configuration, 4, false);
                        Assertions.assertThat(runJobAndGetExternalizedCheckpoint4).isNotNull();
                        verifyCheckpointExistOrWaitDeleted(runJobAndGetExternalizedCheckpoint3, determineFileExist(restoreMode, z2, z2), z2, loadCheckpointMetadata3);
                        verifyCheckpointExistOrWaitDeleted(runJobAndGetExternalizedCheckpoint4, TernaryBoolean.FALSE, z2, null);
                        miniClusterWithClientResource.after();
                    } finally {
                        miniClusterWithClientResource.after();
                    }
                } finally {
                    miniClusterWithClientResource3.after();
                }
            } finally {
            }
        } finally {
        }
    }

    private void verifyStateHandleType(CheckpointMetadata checkpointMetadata, boolean z) throws IOException {
        boolean z2 = false;
        Iterator it = checkpointMetadata.getOperatorStates().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((OperatorState) it.next()).getStates().iterator();
            while (it2.hasNext()) {
                for (IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle : new ArrayList((Collection) ((OperatorSubtaskState) it2.next()).getManagedKeyedState())) {
                    Assertions.assertThat(incrementalRemoteKeyedStateHandle).isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
                    incrementalRemoteKeyedStateHandle.streamSubHandles().forEach(streamStateHandle -> {
                        if (z) {
                            Assertions.assertThat(streamStateHandle).isInstanceOf(SegmentFileStateHandle.class);
                        } else {
                            Assertions.assertThat(streamStateHandle).isNotInstanceOf(SegmentFileStateHandle.class);
                        }
                    });
                    z2 = true;
                }
            }
        }
        Assertions.assertThat(z2).isTrue();
    }

    private static TernaryBoolean determineFileExist(RestoreMode restoreMode, boolean z, boolean z2) {
        return restoreMode == RestoreMode.CLAIM ? (z || z2) ? TernaryBoolean.FALSE : TernaryBoolean.UNDEFINED : TernaryBoolean.TRUE;
    }

    private void verifyCheckpointExistOrWaitDeleted(String str, TernaryBoolean ternaryBoolean, boolean z, CheckpointMetadata checkpointMetadata) throws Exception {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(str);
        FileSystem fileSystem = path.getFileSystem();
        org.apache.flink.core.fs.Path parent = path.getParent();
        org.apache.flink.core.fs.Path path2 = new org.apache.flink.core.fs.Path(parent, "shared");
        org.apache.flink.core.fs.Path path3 = new org.apache.flink.core.fs.Path(parent, "taskowned");
        Assertions.assertThat(fileSystem.exists(parent)).isTrue();
        Assertions.assertThat(fileSystem.exists(path2)).isTrue();
        Assertions.assertThat(fileSystem.exists(path3)).isTrue();
        if (ternaryBoolean.equals(TernaryBoolean.TRUE)) {
            Assertions.assertThat(fileSystem.exists(path)).isTrue();
            Assertions.assertThat(checkpointMetadata == null || verifyCheckpointExist(checkpointMetadata, true)).isTrue();
            Assertions.assertThat(fileSystem.listStatus(path3) != null && fileSystem.listStatus(path3).length > 0).isEqualTo(z);
        } else if (ternaryBoolean.equals(TernaryBoolean.FALSE)) {
            long j = 0;
            boolean z2 = true;
            while (z2) {
                try {
                    z2 = fileSystem.exists(path) || !((checkpointMetadata == null || verifyCheckpointExist(checkpointMetadata, false)) && verifyCheckpointNoDirectory(fileSystem, path2, path3));
                } catch (IOException e) {
                }
                if (z2) {
                    Thread.sleep(500L);
                    j += 500;
                    if (j >= DELETE_TIMEOUT_MILLS) {
                        Assertions.assertThat(fileSystem.exists(path)).isFalse();
                        Assertions.assertThat(fileSystem.listStatus(path2)).isNullOrEmpty();
                        Assertions.assertThat(fileSystem.listStatus(path3)).isNullOrEmpty();
                    }
                }
            }
        }
    }

    private boolean verifyCheckpointExist(CheckpointMetadata checkpointMetadata, boolean z) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Iterator it = checkpointMetadata.getOperatorStates().iterator();
        while (it.hasNext()) {
            for (OperatorSubtaskState operatorSubtaskState : ((OperatorState) it.next()).getStates()) {
                for (IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle : new ArrayList((Collection) operatorSubtaskState.getManagedKeyedState())) {
                    Assertions.assertThat(incrementalRemoteKeyedStateHandle).isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
                    atomicBoolean.set(atomicBoolean.get() || incrementalRemoteKeyedStateHandle.streamSubHandles().allMatch(streamStateHandle -> {
                        try {
                            if (streamStateHandle instanceof FileStateHandle) {
                                org.apache.flink.core.fs.Path filePath = ((FileStateHandle) streamStateHandle).getFilePath();
                                return z == filePath.getFileSystem().exists(filePath);
                            }
                            if (!(streamStateHandle instanceof SegmentFileStateHandle)) {
                                return true;
                            }
                            org.apache.flink.core.fs.Path filePath2 = ((SegmentFileStateHandle) streamStateHandle).getFilePath();
                            return z == filePath2.getFileSystem().exists(filePath2);
                        } catch (IOException e) {
                            this.log.warn("An error occurred when trying to check the file existence.", e);
                            return false;
                        }
                    }));
                    if (!atomicBoolean.get()) {
                        break;
                    }
                }
                if (!atomicBoolean.get()) {
                    break;
                }
                for (FileMergingOperatorStreamStateHandle fileMergingOperatorStreamStateHandle : new ArrayList((Collection) operatorSubtaskState.getManagedOperatorState())) {
                    if (fileMergingOperatorStreamStateHandle instanceof FileMergingOperatorStreamStateHandle) {
                        try {
                            org.apache.flink.core.fs.Path directory = fileMergingOperatorStreamStateHandle.getSharedDirHandle().getDirectory();
                            if (z != directory.getFileSystem().exists(directory)) {
                                atomicBoolean.set(false);
                            }
                            org.apache.flink.core.fs.Path directory2 = fileMergingOperatorStreamStateHandle.getTaskOwnedDirHandle().getDirectory();
                            if (z != directory2.getFileSystem().exists(directory2)) {
                                atomicBoolean.set(false);
                            }
                        } catch (IOException e) {
                            this.log.warn("An error occurred when trying to check the file existence.", e);
                            atomicBoolean.set(false);
                        }
                    }
                }
                if (!atomicBoolean.get()) {
                    break;
                }
            }
        }
        return atomicBoolean.get();
    }

    private static boolean verifyCheckpointNoDirectory(FileSystem fileSystem, org.apache.flink.core.fs.Path path, org.apache.flink.core.fs.Path path2) throws IOException {
        FileStatus[] listStatus = fileSystem.listStatus(path);
        if (listStatus != null) {
            for (FileStatus fileStatus : listStatus) {
                if (fileStatus.isDir()) {
                    return false;
                }
            }
        }
        FileStatus[] listStatus2 = fileSystem.listStatus(path2);
        if (listStatus2 == null) {
            return true;
        }
        for (FileStatus fileStatus2 : listStatus2) {
            if (fileStatus2.isDir()) {
                return false;
            }
        }
        return true;
    }
}
