package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.class */
public class FsCheckpointStorageTest extends AbstractFileCheckpointStorageTestBase {
    private static final int FILE_SIZE_THRESHOLD = 1024;
    private static final int WRITE_BUFFER_SIZE = 4096;

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest$TestingPath.class */
    private static final class TestingPath extends Path {
        private static final long serialVersionUID = 2560119808844230488L;

        @Nonnull
        private final transient FileSystem fileSystem;

        TestingPath(String str, @Nonnull FileSystem fileSystem) {
            super(str);
            this.fileSystem = fileSystem;
        }

        public FileSystem getFileSystem() throws IOException {
            return this.fileSystem;
        }
    }

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageTestBase
    protected CheckpointStorage createCheckpointStorage(Path path) throws Exception {
        return new FsCheckpointStorage(path, (Path) null, new JobID(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE);
    }

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageTestBase
    protected CheckpointStorage createCheckpointStorageWithSavepointDir(Path path, Path path2) throws Exception {
        return new FsCheckpointStorage(path, path2, new JobID(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE);
    }

    @Test
    public void testSavepointsInOneDirectoryDefaultLocation() throws Exception {
        Path fromLocalFile = Path.fromLocalFile(this.tmp.newFolder());
        FsCheckpointStorageLocation initializeLocationForSavepoint = new FsCheckpointStorage(Path.fromLocalFile(this.tmp.newFolder()), fromLocalFile, new JobID(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE).initializeLocationForSavepoint(52452L, (String) null);
        assertParent(fromLocalFile, initializeLocationForSavepoint.getCheckpointDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getSharedStateDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getTaskOwnedStateDirectory());
        initializeLocationForSavepoint.disposeOnFailure();
    }

    @Test
    public void testSavepointsInOneDirectoryCustomLocation() throws Exception {
        Path fromLocalFile = Path.fromLocalFile(this.tmp.newFolder());
        FsCheckpointStorageLocation initializeLocationForSavepoint = new FsCheckpointStorage(Path.fromLocalFile(this.tmp.newFolder()), (Path) null, new JobID(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE).initializeLocationForSavepoint(52452L, fromLocalFile.toString());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getCheckpointDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getSharedStateDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getTaskOwnedStateDirectory());
        initializeLocationForSavepoint.disposeOnFailure();
    }

    @Test
    public void testTaskOwnedStateStream() throws Exception {
        List asList = Arrays.asList("Flopsy", "Mopsy", "Cotton Tail", "Peter");
        CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream = new FsCheckpointStorage(Path.fromLocalFile(this.tmp.newFolder()), (Path) null, new JobID(), 10, WRITE_BUFFER_SIZE).createTaskOwnedStateStream();
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(createTaskOwnedStateStream instanceof FsCheckpointStreamFactory.FsCheckpointStateOutputStream);
                new ObjectOutputStream(createTaskOwnedStateStream).writeObject(asList);
                FileStateHandle closeAndGetHandle = createTaskOwnedStateStream.closeAndGetHandle();
                if (createTaskOwnedStateStream != null) {
                    if (0 != 0) {
                        try {
                            createTaskOwnedStateStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTaskOwnedStateStream.close();
                    }
                }
                Assert.assertEquals("taskowned", closeAndGetHandle.getFilePath().getParent().getName());
                ObjectInputStream objectInputStream = new ObjectInputStream(closeAndGetHandle.openInputStream());
                Throwable th3 = null;
                try {
                    Assert.assertEquals(asList, objectInputStream.readObject());
                    if (objectInputStream != null) {
                        if (0 == 0) {
                            objectInputStream.close();
                            return;
                        }
                        try {
                            objectInputStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (objectInputStream != null) {
                        if (0 != 0) {
                            try {
                                objectInputStream.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            objectInputStream.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (createTaskOwnedStateStream != null) {
                if (th != null) {
                    try {
                        createTaskOwnedStateStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createTaskOwnedStateStream.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testDirectoriesForExclusiveAndSharedState() throws Exception {
        FsCheckpointStorageLocation fsCheckpointStorageLocation = new FsCheckpointStorageLocation(LocalFileSystem.getSharedInstance(), randomTempPath(), randomTempPath(), randomTempPath(), CheckpointStorageLocationReference.getDefault(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE);
        Assert.assertNotEquals(fsCheckpointStorageLocation.getCheckpointDirectory(), fsCheckpointStorageLocation.getSharedStateDirectory());
        Assert.assertEquals(0L, r0.listStatus(fsCheckpointStorageLocation.getCheckpointDirectory()).length);
        Assert.assertEquals(0L, r0.listStatus(fsCheckpointStorageLocation.getSharedStateDirectory()).length);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = fsCheckpointStorageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(42);
        createCheckpointStateOutputStream.flush();
        StreamStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assert.assertEquals(1L, r0.listStatus(fsCheckpointStorageLocation.getCheckpointDirectory()).length);
        Assert.assertEquals(0L, r0.listStatus(fsCheckpointStorageLocation.getSharedStateDirectory()).length);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream2 = fsCheckpointStorageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        createCheckpointStateOutputStream2.write(42);
        createCheckpointStateOutputStream2.flush();
        StreamStateHandle closeAndGetHandle2 = createCheckpointStateOutputStream2.closeAndGetHandle();
        Assert.assertEquals(1L, r0.listStatus(fsCheckpointStorageLocation.getCheckpointDirectory()).length);
        Assert.assertEquals(1L, r0.listStatus(fsCheckpointStorageLocation.getSharedStateDirectory()).length);
        closeAndGetHandle.discardState();
        closeAndGetHandle2.discardState();
    }

    @Test
    public void testStorageLocationDoesNotMkdirs() throws Exception {
        FsCheckpointStorage fsCheckpointStorage = new FsCheckpointStorage(randomTempPath(), (Path) null, new JobID(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE);
        Assert.assertTrue(new File(fsCheckpointStorage.getCheckpointsDirectory().getPath()).exists());
        Assert.assertFalse(new File(fsCheckpointStorage.resolveCheckpointStorageLocation(177L, CheckpointStorageLocationReference.getDefault()).getCheckpointDirectory().getPath()).exists());
    }

    @Test
    public void testResolveCheckpointStorageLocation() throws Exception {
        FileSystem fileSystem = (FileSystem) Mockito.mock(FileSystem.class);
        FsCheckpointStorage fsCheckpointStorage = new FsCheckpointStorage(new TestingPath("hdfs:///checkpoint/", fileSystem), (Path) null, new JobID(), FILE_SIZE_THRESHOLD, WRITE_BUFFER_SIZE);
        Assert.assertEquals(fileSystem, fsCheckpointStorage.resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault()).getFileSystem());
        Assert.assertTrue(fsCheckpointStorage.resolveCheckpointStorageLocation(2L, AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///savepoint/"))).getFileSystem() instanceof LocalFileSystem);
    }

    private void assertParent(Path path, Path path2) {
        Assert.assertEquals(new Path(path, path2.getName()), path2);
    }
}
