package org.apache.apex.malhar.lib.wal;

import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.Slice;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
import org.apache.apex.malhar.lib.join.POJOPartitionJoinOperatorTest;
import org.apache.apex.malhar.lib.utils.FileContextUtils;
import org.apache.apex.malhar.lib.wal.FileSystemWAL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/wal/FileSystemWALTest.class */
public class FileSystemWALTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final Random RAND = new Random();
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWALTest.class);

    /* loaded from: input_file:org/apache/apex/malhar/lib/wal/FileSystemWALTest$TestMeta.class */
    private class TestMeta extends TestWatcher {
        private String targetDir;
        FileSystemWAL fsWAL;
        Configuration conf;
        FileSystem fs;

        private TestMeta() {
            this.fsWAL = new FileSystemWAL();
            this.conf = new Configuration();
        }

        protected void starting(Description description) {
            this.targetDir = "target/" + description.getClassName() + "/" + description.getMethodName();
            try {
                this.fs = FileSystem.get(new URI(this.targetDir + "/WAL"), this.conf);
                this.fs.delete(new Path(this.targetDir), true);
                this.fsWAL = new FileSystemWAL();
                this.fsWAL.setFilePath(this.targetDir + "/WAL");
            } catch (IOException | URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            try {
                this.fs.delete(new Path(this.targetDir), true);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static Slice getRandomSlice(int i) {
        byte[] bArr = new byte[i];
        RAND.nextBytes(bArr);
        return new Slice(bArr);
    }

    @Test
    public void testSerde() throws IOException {
        Assert.assertNotNull("File System WAL", (FileSystemWAL) KryoCloneUtils.cloneObject(this.testMeta.fsWAL));
    }

    @Test
    public void testWalWriteAndRead() throws IOException {
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        for (int i = 0; i < 100; i++) {
            writer.append(getRandomSlice(RAND.nextInt(100)));
        }
        writer.rotate(true);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        Assert.assertEquals("WAL file created ", true, Boolean.valueOf(this.testMeta.fs.isFile(new Path(this.testMeta.fsWAL.getPartFilePath(0)))));
        assertNumTuplesRead(this.testMeta.fsWAL.getReader(), 100);
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testWalSkip() throws IOException {
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        int i = 0;
        long j = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            if (i2 == 30) {
                j = i;
            }
            i += writer.append(getRandomSlice(100));
        }
        writer.rotate(true);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        reader.seek(new FileSystemWAL.FileSystemWALPointer(0, j));
        assertNumTuplesRead(reader, 100 - 30);
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testWalRolling() throws IOException {
        this.testMeta.fsWAL.setMaxLength(32768L);
        this.testMeta.fsWAL.setup();
        write1KRecords(this.testMeta.fsWAL.getWriter(), 100);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        assertNumTuplesRead(reader, 100);
        reader.seek(new FileSystemWAL.FileSystemWALPointer(1, 0L));
        assertNumTuplesRead(reader, 100 - 32);
        reader.seek(new FileSystemWAL.FileSystemWALPointer(1, 16384L));
        assertNumTuplesRead(reader, 100 - 48);
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testWalRollingWithPartialFiles() throws IOException {
        this.testMeta.fsWAL.setMaxLength(1024L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        writer.append(getRandomSlice(POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS));
        writer.append(getRandomSlice(POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS));
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        assertNumTuplesRead(this.testMeta.fsWAL.getReader(), 2);
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testFinalizeAfterDelay() throws IOException {
        this.testMeta.fsWAL.setMaxLength(32768L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 32);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        write1KRecords(writer, 32);
        this.testMeta.fsWAL.beforeCheckpoint(1L);
        write1KRecords(writer, 32);
        this.testMeta.fsWAL.beforeCheckpoint(2L);
        this.testMeta.fsWAL.committed(2L);
        assertNumTuplesRead(this.testMeta.fsWAL.getReader(), 96);
    }

    @Test
    public void testRecovery() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 3);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        assertNumTuplesRead(reader, 3);
        writer.close();
        this.testMeta.fsWAL.setup();
        write1KRecords(writer, 1);
        this.testMeta.fsWAL.beforeCheckpoint(1L);
        this.testMeta.fsWAL.committed(1L);
        reader.seek(new FileSystemWAL.FileSystemWALPointer(1, 0L));
        assertNumTuplesRead(reader, 2);
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testDeleteOfTmpFiles() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 2);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        FileSystemWAL fileSystemWAL = (FileSystemWAL) KryoCloneUtils.cloneObject(this.testMeta.fsWAL);
        write1KRecords(writer, 2);
        this.testMeta.fsWAL.beforeCheckpoint(1L);
        writer.close();
        fileSystemWAL.setup();
        write1KRecords(fileSystemWAL.getWriter(), 2);
        fileSystemWAL.beforeCheckpoint(1L);
        fileSystemWAL.committed(1L);
        assertNumTuplesRead(fileSystemWAL.getReader(), 4);
        fileSystemWAL.teardown();
    }

    @Test
    public void testReadWithInterceptingFinalize() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 1);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        Assert.assertNotNull("one entry", reader.next());
        write1KRecords(writer, 1);
        this.testMeta.fsWAL.beforeCheckpoint(1L);
        this.testMeta.fsWAL.committed(1L);
        assertNumTuplesRead(reader, 1);
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testDeleteEverything() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 10);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        assertNumTuplesRead(this.testMeta.fsWAL.getReader(), 10);
        writer.delete(writer.getPointer());
        FileContext fileContext = FileContextUtils.getFileContext(this.testMeta.fsWAL.getFilePath());
        for (int i = 0; i < 5; i++) {
            Assert.assertTrue("part exists " + i, !fileContext.util().exists(new Path(this.testMeta.fsWAL.getPartFilePath(i))));
        }
        this.testMeta.fsWAL.teardown();
    }

    @Test
    public void testDeleteFullParts() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 10);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        assertNumTuplesRead(reader, 10);
        writer.delete(new FileSystemWAL.FileSystemWALPointer(3, 0L));
        reader.seek(reader.getStartPointer());
        assertNumTuplesRead(reader, 4);
    }

    @Test
    public void testDeletePartialParts() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 4);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        this.testMeta.fsWAL.committed(0L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        assertNumTuplesRead(reader, 4);
        writer.delete(new FileSystemWAL.FileSystemWALPointer(1, 1024L));
        reader.seek(reader.getStartPointer());
        assertNumTuplesRead(reader, 1);
    }

    @Test
    public void testFinalizeWithDelete() throws IOException {
        this.testMeta.fsWAL.setMaxLength(2048L);
        this.testMeta.fsWAL.setup();
        FileSystemWAL.FileSystemWALWriter writer = this.testMeta.fsWAL.getWriter();
        write1KRecords(writer, 2);
        this.testMeta.fsWAL.beforeCheckpoint(0L);
        write1KRecords(writer, 2);
        this.testMeta.fsWAL.beforeCheckpoint(1L);
        write1KRecords(writer, 2);
        this.testMeta.fsWAL.beforeCheckpoint(2L);
        FileSystemWAL.FileSystemWALReader reader = this.testMeta.fsWAL.getReader();
        assertNumTuplesRead(reader, 6);
        this.testMeta.fsWAL.committed(0L);
        writer.delete(new FileSystemWAL.FileSystemWALPointer(2, 0L));
        FileContext fileContext = FileContextUtils.getFileContext(this.testMeta.fsWAL.getFilePath());
        Assert.assertTrue("part 0 exists ", !fileContext.util().exists(new Path(this.testMeta.fsWAL.getPartFilePath(0))));
        this.testMeta.fsWAL.committed(1L);
        Assert.assertTrue("part 1 exists ", !fileContext.util().exists(new Path(this.testMeta.fsWAL.getPartFilePath(1))));
        reader.seek(reader.getStartPointer());
        assertNumTuplesRead(reader, 2);
    }

    @Test
    public void testPointerComparisons() {
        FileSystemWAL.FileSystemWALPointer fileSystemWALPointer = new FileSystemWAL.FileSystemWALPointer(0, 10L);
        FileSystemWAL.FileSystemWALPointer fileSystemWALPointer2 = new FileSystemWAL.FileSystemWALPointer(0, 10L);
        Assert.assertTrue("equal", fileSystemWALPointer.compareTo(fileSystemWALPointer2) == 0);
        Assert.assertTrue("offset greater", new FileSystemWAL.FileSystemWALPointer(0, 11L).compareTo(fileSystemWALPointer2) == 1);
        Assert.assertTrue("offset smaller", new FileSystemWAL.FileSystemWALPointer(0, 3L).compareTo(fileSystemWALPointer2) == -1);
        Assert.assertTrue("part greater", new FileSystemWAL.FileSystemWALPointer(1, 10L).compareTo(fileSystemWALPointer2) == 1);
        Assert.assertTrue("part smaller", new FileSystemWAL.FileSystemWALPointer(0, 10L).compareTo(new FileSystemWAL.FileSystemWALPointer(3, 10L)) == -1);
    }

    private static void assertNumTuplesRead(FileSystemWAL.FileSystemWALReader fileSystemWALReader, int i) throws IOException {
        int i2 = 0;
        while (fileSystemWALReader.next() != null) {
            i2++;
        }
        Assert.assertEquals("num tuples", i, i2);
    }

    private static void write1KRecords(FileSystemWAL.FileSystemWALWriter fileSystemWALWriter, int i) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            fileSystemWALWriter.append(getRandomSlice(1020));
        }
    }
}
