package org.apache.flink.core.fs;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/core/fs/AbstractRecoverableWriterTest.class */
public abstract class AbstractRecoverableWriterTest extends TestLogger {
    private static final Random RND = new Random();
    private static final String testData1 = "THIS IS A TEST 1.";
    private static final String testData2 = "THIS IS A TEST 2.";
    private static final String testData3 = "THIS IS A TEST 3.";
    private Path basePathForTest;
    private static FileSystem fileSystem;
    private static final String INIT_EMPTY_PERSIST = "EMPTY";
    private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
    private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
    private static final String FINAL_WITH_EXTRA_STATE = "FINAL";

    public abstract Path getBasePath() throws Exception;

    public abstract FileSystem initializeFileSystem() throws Exception;

    public Path getBasePathForTest() {
        return this.basePathForTest;
    }

    private FileSystem getFileSystem() throws Exception {
        if (fileSystem == null) {
            fileSystem = initializeFileSystem();
        }
        return fileSystem;
    }

    private RecoverableWriter getNewFileSystemWriter() throws Exception {
        return getFileSystem().createRecoverableWriter();
    }

    @Before
    public void prepare() throws Exception {
        this.basePathForTest = new Path(getBasePath(), randomName());
        getFileSystem().mkdirs(this.basePathForTest);
    }

    @After
    public void cleanup() throws Exception {
        getFileSystem().delete(this.basePathForTest, true);
    }

    @Test
    public void testCloseWithNoData() throws Exception {
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        Path basePathForTest = getBasePathForTest();
        RecoverableFsDataOutputStream open = newFileSystemWriter.open(new Path(basePathForTest, "part-0"));
        for (Map.Entry<Path, String> entry : getFileContentByPath(basePathForTest).entrySet()) {
            Assert.assertTrue(entry.getKey().getName().startsWith(".part-0.inprogress."));
            Assert.assertTrue(entry.getValue().isEmpty());
        }
        open.closeForCommit().commit();
        for (Map.Entry<Path, String> entry2 : getFileContentByPath(basePathForTest).entrySet()) {
            Assert.assertEquals("part-0", entry2.getKey().getName());
            Assert.assertTrue(entry2.getValue().isEmpty());
        }
    }

    @Test
    public void testCommitAfterNormalClose() throws Exception {
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        Path basePathForTest = getBasePathForTest();
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = newFileSystemWriter.open(new Path(basePathForTest, "part-0"));
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverableFsDataOutputStream.closeForCommit().commit();
            for (Map.Entry<Path, String> entry : getFileContentByPath(basePathForTest).entrySet()) {
                Assert.assertEquals("part-0", entry.getKey().getName());
                Assert.assertEquals(testData1, entry.getValue());
            }
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            throw th;
        }
    }

    @Test
    public void testCommitAfterPersist() throws Exception {
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        Path basePathForTest = getBasePathForTest();
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = newFileSystemWriter.open(new Path(basePathForTest, "part-0"));
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverableFsDataOutputStream.persist();
            recoverableFsDataOutputStream.write(testData2.getBytes(StandardCharsets.UTF_8));
            recoverableFsDataOutputStream.closeForCommit().commit();
            for (Map.Entry<Path, String> entry : getFileContentByPath(basePathForTest).entrySet()) {
                Assert.assertEquals("part-0", entry.getKey().getName());
                Assert.assertEquals("THIS IS A TEST 1.THIS IS A TEST 2.", entry.getValue());
            }
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            throw th;
        }
    }

    @Test
    public void testRecoverWithEmptyState() throws Exception {
        testResumeAfterMultiplePersist(INIT_EMPTY_PERSIST, "", testData3);
    }

    @Test
    public void testRecoverWithState() throws Exception {
        testResumeAfterMultiplePersist(INTERM_WITH_STATE_PERSIST, testData1, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
        testResumeAfterMultiplePersist(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, testData1, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    public void testRecoverAfterMultiplePersistsState() throws Exception {
        testResumeAfterMultiplePersist(FINAL_WITH_EXTRA_STATE, "THIS IS A TEST 1.THIS IS A TEST 2.", "THIS IS A TEST 1.THIS IS A TEST 2.THIS IS A TEST 3.");
    }

    private void testResumeAfterMultiplePersist(String str, String str2, String str3) throws Exception {
        Path basePathForTest = getBasePathForTest();
        Path path = new Path(basePathForTest, "part-0");
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        HashMap hashMap = new HashMap(4);
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = newFileSystemWriter.open(path);
            hashMap.put(INIT_EMPTY_PERSIST, recoverableFsDataOutputStream.persist());
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            hashMap.put(INTERM_WITH_STATE_PERSIST, recoverableFsDataOutputStream.persist());
            hashMap.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, recoverableFsDataOutputStream.persist());
            recoverableFsDataOutputStream.write(testData2.getBytes(StandardCharsets.UTF_8));
            hashMap.put(FINAL_WITH_EXTRA_STATE, recoverableFsDataOutputStream.persist());
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            SimpleVersionedSerializer resumeRecoverableSerializer = newFileSystemWriter.getResumeRecoverableSerializer();
            byte[] serialize = resumeRecoverableSerializer.serialize(hashMap.get(str));
            RecoverableWriter newFileSystemWriter2 = getNewFileSystemWriter();
            RecoverableFsDataOutputStream recoverableFsDataOutputStream2 = null;
            try {
                recoverableFsDataOutputStream2 = newFileSystemWriter2.recover((RecoverableWriter.ResumeRecoverable) newFileSystemWriter2.getResumeRecoverableSerializer().deserialize(resumeRecoverableSerializer.getVersion(), serialize));
                Map<Path, String> fileContentByPath = getFileContentByPath(basePathForTest);
                Assert.assertEquals(1L, fileContentByPath.size());
                for (Map.Entry<Path, String> entry : fileContentByPath.entrySet()) {
                    Assert.assertTrue(entry.getKey().getName().startsWith(".part-0.inprogress."));
                    Assert.assertEquals(str2, entry.getValue());
                }
                recoverableFsDataOutputStream2.write(testData3.getBytes(StandardCharsets.UTF_8));
                recoverableFsDataOutputStream2.closeForCommit().commit();
                Map<Path, String> fileContentByPath2 = getFileContentByPath(basePathForTest);
                Assert.assertEquals(1L, fileContentByPath2.size());
                for (Map.Entry<Path, String> entry2 : fileContentByPath2.entrySet()) {
                    Assert.assertEquals("part-0", entry2.getKey().getName());
                    Assert.assertEquals(str3, entry2.getValue());
                }
                IOUtils.closeQuietly(recoverableFsDataOutputStream2);
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testCommitAfterRecovery() throws Exception {
        Path basePathForTest = getBasePathForTest();
        Path path = new Path(basePathForTest, "part-0");
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = newFileSystemWriter.open(path);
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverableFsDataOutputStream.persist();
            recoverableFsDataOutputStream.persist();
            recoverableFsDataOutputStream.write(testData2.getBytes(StandardCharsets.UTF_8));
            RecoverableWriter.CommitRecoverable recoverable = recoverableFsDataOutputStream.closeForCommit().getRecoverable();
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            byte[] serialize = newFileSystemWriter.getCommitRecoverableSerializer().serialize(recoverable);
            RecoverableWriter newFileSystemWriter2 = getNewFileSystemWriter();
            SimpleVersionedSerializer commitRecoverableSerializer = newFileSystemWriter2.getCommitRecoverableSerializer();
            newFileSystemWriter2.recoverForCommit((RecoverableWriter.CommitRecoverable) commitRecoverableSerializer.deserialize(commitRecoverableSerializer.getVersion(), serialize)).commitAfterRecovery();
            Map<Path, String> fileContentByPath = getFileContentByPath(basePathForTest);
            Assert.assertEquals(1L, fileContentByPath.size());
            for (Map.Entry<Path, String> entry : fileContentByPath.entrySet()) {
                Assert.assertEquals("part-0", entry.getKey().getName());
                Assert.assertEquals("THIS IS A TEST 1.THIS IS A TEST 2.", entry.getValue());
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            throw th;
        }
    }

    @Test(expected = IOException.class)
    public void testExceptionWritingAfterCloseForCommit() throws Exception {
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = getNewFileSystemWriter().open(new Path(getBasePathForTest(), "part-0"));
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverableFsDataOutputStream.closeForCommit().getRecoverable();
            recoverableFsDataOutputStream.write(testData2.getBytes(StandardCharsets.UTF_8));
            Assert.fail();
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            throw th;
        }
    }

    @Test(expected = IOException.class)
    public void testResumeAfterCommit() throws Exception {
        Path basePathForTest = getBasePathForTest();
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = newFileSystemWriter.open(new Path(basePathForTest, "part-0"));
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            RecoverableWriter.ResumeRecoverable persist = recoverableFsDataOutputStream.persist();
            recoverableFsDataOutputStream.write(testData2.getBytes(StandardCharsets.UTF_8));
            recoverableFsDataOutputStream.closeForCommit().commit();
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            newFileSystemWriter.recover(persist);
            Assert.fail();
        } catch (Throwable th) {
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            throw th;
        }
    }

    @Test
    public void testResumeWithWrongOffset() throws Exception {
        Path basePathForTest = getBasePathForTest();
        RecoverableWriter newFileSystemWriter = getNewFileSystemWriter();
        RecoverableFsDataOutputStream recoverableFsDataOutputStream = null;
        try {
            recoverableFsDataOutputStream = newFileSystemWriter.open(new Path(basePathForTest, "part-0"));
            recoverableFsDataOutputStream.write(testData1.getBytes(StandardCharsets.UTF_8));
            RecoverableWriter.ResumeRecoverable persist = recoverableFsDataOutputStream.persist();
            recoverableFsDataOutputStream.write(testData2.getBytes(StandardCharsets.UTF_8));
            RecoverableWriter.ResumeRecoverable persist2 = recoverableFsDataOutputStream.persist();
            recoverableFsDataOutputStream.write(testData3.getBytes(StandardCharsets.UTF_8));
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            try {
                RecoverableFsDataOutputStream recover = newFileSystemWriter.recover(persist);
                Throwable th = null;
                if (recover != null) {
                    if (0 != 0) {
                        try {
                            recover.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        recover.close();
                    }
                }
            } catch (Exception e) {
                Assert.fail();
            }
            try {
                RecoverableFsDataOutputStream recover2 = newFileSystemWriter.recover(persist2);
                Throwable th3 = null;
                try {
                    try {
                        Assert.fail();
                        if (recover2 != null) {
                            if (0 != 0) {
                                try {
                                    recover2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                recover2.close();
                            }
                        }
                        Assert.fail();
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e2) {
            }
        } catch (Throwable th5) {
            IOUtils.closeQuietly(recoverableFsDataOutputStream);
            throw th5;
        }
    }

    private Map<Path, String> getFileContentByPath(Path path) throws Exception {
        HashMap hashMap = new HashMap();
        for (FileStatus fileStatus : getFileSystem().listStatus(path)) {
            byte[] bArr = new byte[(int) fileStatus.getLen()];
            getFileSystem().open(fileStatus.getPath()).read(bArr);
            hashMap.put(fileStatus.getPath(), new String(bArr, StandardCharsets.UTF_8));
        }
        return hashMap;
    }

    private static String randomName() {
        return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
    }
}
