package org.apache.flink.runtime.state.filesystem;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/CheckpointStateOutputStreamTest.class */
public class CheckpointStateOutputStreamTest extends TestLogger {

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();

    @Parameterized.Parameter
    public CheckpointStateOutputStreamType stateOutputStreamType;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/CheckpointStateOutputStreamTest$BlockerStream.class */
    public static class BlockerStream extends FSDataOutputStream {
        private final OneShotLatch blocker;

        private BlockerStream() {
            this.blocker = new OneShotLatch();
        }

        public long getPos() throws IOException {
            block();
            return 0L;
        }

        public void write(int i) throws IOException {
            block();
        }

        public void flush() throws IOException {
            block();
        }

        public void sync() throws IOException {
            block();
        }

        public void close() throws IOException {
            this.blocker.trigger();
        }

        private void block() throws IOException {
            try {
                this.blocker.await();
                throw new IOException("closed");
            } catch (InterruptedException e) {
                throw new IOException("interrupted");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/CheckpointStateOutputStreamTest$CheckpointStateOutputStreamType.class */
    public enum CheckpointStateOutputStreamType {
        FileBasedState,
        FsCheckpointMetaData
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/CheckpointStateOutputStreamTest$FailingCloseStream.class */
    public static class FailingCloseStream extends LocalDataOutputStream {
        FailingCloseStream(File file) throws IOException {
            super(file);
        }

        public void close() throws IOException {
            throw new IOException();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/CheckpointStateOutputStreamTest$FsWithoutRecoverableWriter.class */
    private static class FsWithoutRecoverableWriter extends LocalFileSystem {
        private final FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory;

        FsWithoutRecoverableWriter(FunctionWithException<Path, FSDataOutputStream, IOException> functionWithException) {
            this.streamFactory = functionWithException;
        }

        public FSDataOutputStream create(Path path, FileSystem.WriteMode writeMode) throws IOException {
            return (FSDataOutputStream) this.streamFactory.apply(path);
        }

        /* renamed from: createRecoverableWriter, reason: merged with bridge method [inline-methods] */
        public LocalRecoverableWriter m727createRecoverableWriter() throws IOException {
            throw new UnsupportedOperationException("This file system does not support recoverable writers.");
        }
    }

    @Parameterized.Parameters
    public static Collection<CheckpointStateOutputStreamType> getCheckpointStateOutputStreamType() {
        return Arrays.asList(CheckpointStateOutputStreamType.values());
    }

    @Test
    public void testEmptyState() throws Exception {
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        Path baseFolder = baseFolder();
        Path path = new Path(baseFolder, "myFileName");
        FSDataOutputStream createTestStream = createTestStream(localFileSystem, baseFolder, "myFileName");
        Throwable th = null;
        try {
            try {
                FileStateHandle closeAndGetResult = closeAndGetResult(createTestStream);
                if (createTestStream != null) {
                    if (0 != 0) {
                        try {
                            createTestStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestStream.close();
                    }
                }
                Assert.assertNotNull(closeAndGetResult);
                Assert.assertEquals(path, closeAndGetResult.getFilePath());
                Assert.assertTrue(localFileSystem.exists(closeAndGetResult.getFilePath()));
                Assert.assertFalse(localFileSystem.getFileStatus(path).isDir());
                FSDataInputStream openInputStream = closeAndGetResult.openInputStream();
                Throwable th3 = null;
                try {
                    Assert.assertEquals(-1L, openInputStream.read());
                    if (openInputStream != null) {
                        if (0 == 0) {
                            openInputStream.close();
                            return;
                        }
                        try {
                            openInputStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (openInputStream != null) {
                        if (0 != 0) {
                            try {
                                openInputStream.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            openInputStream.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (createTestStream != null) {
                if (th != null) {
                    try {
                        createTestStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createTestStream.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testWriteAndRead() throws Exception {
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        Path baseFolder = baseFolder();
        Random random = new Random();
        byte[] bArr = new byte[1694523];
        FSDataOutputStream createTestStream = createTestStream(localFileSystem, baseFolder, "fooBarName");
        Throwable th = null;
        int i = 0;
        while (i < bArr.length) {
            try {
                try {
                    if (random.nextBoolean()) {
                        int i2 = i;
                        i++;
                        createTestStream.write(bArr[i2]);
                    } else {
                        int nextInt = random.nextInt(Math.min(bArr.length - i, 32));
                        createTestStream.write(bArr, i, nextInt);
                        i += nextInt;
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (createTestStream != null) {
                    if (th != null) {
                        try {
                            createTestStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createTestStream.close();
                    }
                }
                throw th3;
            }
        }
        FileStateHandle closeAndGetResult = closeAndGetResult(createTestStream);
        if (createTestStream != null) {
            if (0 != 0) {
                try {
                    createTestStream.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            } else {
                createTestStream.close();
            }
        }
        FSDataInputStream openInputStream = closeAndGetResult.openInputStream();
        Throwable th6 = null;
        try {
            try {
                byte[] bArr2 = new byte[bArr.length];
                readFully(openInputStream, bArr2);
                Assert.assertArrayEquals(bArr, bArr2);
                if (openInputStream != null) {
                    if (0 != 0) {
                        try {
                            openInputStream.close();
                        } catch (Throwable th7) {
                            th6.addSuppressed(th7);
                        }
                    } else {
                        openInputStream.close();
                    }
                }
                FSDataInputStream open = localFileSystem.open(closeAndGetResult.getFilePath());
                Throwable th8 = null;
                try {
                    try {
                        byte[] bArr3 = new byte[bArr.length];
                        readFully(open, bArr3);
                        Assert.assertArrayEquals(bArr, bArr3);
                        if (open != null) {
                            if (0 == 0) {
                                open.close();
                                return;
                            }
                            try {
                                open.close();
                            } catch (Throwable th9) {
                                th8.addSuppressed(th9);
                            }
                        }
                    } catch (Throwable th10) {
                        th8 = th10;
                        throw th10;
                    }
                } catch (Throwable th11) {
                    if (open != null) {
                        if (th8 != null) {
                            try {
                                open.close();
                            } catch (Throwable th12) {
                                th8.addSuppressed(th12);
                            }
                        } else {
                            open.close();
                        }
                    }
                    throw th11;
                }
            } catch (Throwable th13) {
                th6 = th13;
                throw th13;
            }
        } catch (Throwable th14) {
            if (openInputStream != null) {
                if (th6 != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th15) {
                        th6.addSuppressed(th15);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th14;
        }
    }

    @Test
    public void testCleanupWhenClosingStream() throws IOException {
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        Path path = new Path(this.tmp.newFolder().toURI());
        Path path2 = new Path(path, "nonCreativeTestFileName");
        FSDataOutputStream createTestStream = createTestStream(localFileSystem, path, "nonCreativeTestFileName");
        Throwable th = null;
        try {
            try {
                Random random = new Random();
                for (int i = 0; i < random.nextInt(1000); i++) {
                    createTestStream.write(random.nextInt(100));
                }
                if (createTestStream != null) {
                    if (0 != 0) {
                        try {
                            createTestStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestStream.close();
                    }
                }
                Assert.assertFalse(localFileSystem.exists(path2));
            } finally {
            }
        } catch (Throwable th3) {
            if (createTestStream != null) {
                if (th != null) {
                    try {
                        createTestStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
        Path path = new Path(this.tmp.newFolder().toURI());
        Path path2 = new Path(path, "test_name");
        FileSystem fileSystem = (FileSystem) Mockito.spy(new FsWithoutRecoverableWriter(path3 -> {
            return new FailingCloseStream(new File(path3.getPath()));
        }));
        FSDataOutputStream createTestStream = createTestStream(fileSystem, path, "test_name");
        createTestStream.write(new byte[]{1, 2, 3, 4, 5});
        try {
            closeAndGetResult(createTestStream);
            Assert.fail("Expected IOException");
        } catch (IOException e) {
        }
        ((FileSystem) Mockito.verify(fileSystem)).delete(path2, false);
    }

    @Test
    public void testCloseDoesNotLock() throws Exception {
        final FSDataOutputStream createTestStream = createTestStream((FileSystem) Mockito.spy(new FsWithoutRecoverableWriter(path -> {
            return new BlockerStream();
        })), new Path(this.tmp.newFolder().toURI()), "this-is-ignored-anyways.file");
        final OneShotLatch oneShotLatch = new OneShotLatch();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.state.filesystem.CheckpointStateOutputStreamTest.1
            public void go() throws Exception {
                oneShotLatch.trigger();
                CheckpointStateOutputStreamTest.this.closeAndGetResult(createTestStream);
            }
        };
        checkedThread.start();
        oneShotLatch.await();
        createTestStream.close();
        try {
            checkedThread.sync();
        } catch (IOException e) {
        }
    }

    private FSDataOutputStream createTestStream(FileSystem fileSystem, Path path, String str) throws IOException {
        switch (this.stateOutputStreamType) {
            case FileBasedState:
                return new FileBasedStateOutputStream(fileSystem, new Path(path, str));
            case FsCheckpointMetaData:
                return new FsCheckpointMetadataOutputStream(fileSystem, new Path(path, str), path);
            default:
                throw new IllegalStateException("Unsupported checkpoint stream output type.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FileStateHandle closeAndGetResult(FSDataOutputStream fSDataOutputStream) throws IOException {
        switch (this.stateOutputStreamType) {
            case FileBasedState:
                return ((FileBasedStateOutputStream) fSDataOutputStream).closeAndGetHandle();
            case FsCheckpointMetaData:
                return ((FsCheckpointMetadataOutputStream) fSDataOutputStream).closeAndFinalizeCheckpoint().getMetadataHandle();
            default:
                throw new IllegalStateException("Unsupported checkpoint stream output type.");
        }
    }

    private Path baseFolder() throws Exception {
        return new Path(new File(this.tmp.newFolder(), UUID.randomUUID().toString()).toURI());
    }

    private static void readFully(InputStream inputStream, byte[] bArr) throws IOException {
        int i = 0;
        int length = bArr.length;
        while (true) {
            int i2 = length;
            if (i2 <= 0) {
                return;
            }
            int read = inputStream.read(bArr, i, i2);
            if (read == -1) {
                throw new EOFException();
            }
            i += read;
            length = i2 - read;
        }
    }
}
