package org.apache.tephra.persist;

import co.cask.tephra.persist.TransactionEdit;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.TestHCM;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.tephra.metrics.MetricsCollector;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.AbstractTransactionLog;
import org.apache.tephra.util.TransactionEditUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/tephra/persist/HDFSTransactionLogTest.class */
public class HDFSTransactionLogTest {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final String LOG_FILE_PREFIX = "txlog.";
    private static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static MetricsCollector metricsCollector;

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hdfs.minidfs.basedir", TMP_FOLDER.newFolder().getAbsolutePath());
        dfsCluster = new MiniDFSCluster.Builder(configuration).numDataNodes(1).build();
        conf = new Configuration(dfsCluster.getFileSystem().getConf());
        metricsCollector = new TxMetricsCollector();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        dfsCluster.shutdown();
    }

    private Configuration getConfiguration() throws IOException {
        conf.unset("data.tx.hdfs.user");
        conf.set("data.tx.snapshot.dir", TMP_FOLDER.newFolder().getAbsolutePath());
        return conf;
    }

    private HDFSTransactionLog getHDFSTransactionLog(Configuration configuration, FileSystem fileSystem, long j) throws Exception {
        return new HDFSTransactionLog(fileSystem, configuration, new Path(configuration.get("data.tx.snapshot.dir"), LOG_FILE_PREFIX + j), j, metricsCollector);
    }

    private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fileSystem, long j, byte b) throws IOException {
        Path path = new Path(configuration.get("data.tx.snapshot.dir"), LOG_FILE_PREFIX + j);
        SequenceFile.Metadata metadata = new SequenceFile.Metadata();
        if (b > 1) {
            metadata.set(new Text("version"), new Text(Byte.toString(b)));
        }
        switch (b) {
            case 1:
            case 2:
                return SequenceFile.createWriter(fileSystem, configuration, path, LongWritable.class, TransactionEdit.class, SequenceFile.CompressionType.NONE, (CompressionCodec) null, (Progressable) null, metadata);
            default:
                return SequenceFile.createWriter(fileSystem, configuration, path, LongWritable.class, TransactionEdit.class, SequenceFile.CompressionType.NONE, (CompressionCodec) null, (Progressable) null, metadata);
        }
    }

    private void writeNumWrites(SequenceFile.Writer writer, int i) throws Exception {
        CommitMarkerCodec.writeMarker(writer, i);
    }

    private void testCaskTransactionLogSync(int i, int i2, byte b, boolean z) throws Exception {
        List<TransactionEdit> createRandomCaskEdits = TransactionEditUtil.createRandomCaskEdits(i);
        long currentTimeMillis = System.currentTimeMillis();
        Configuration configuration = getConfiguration();
        FileSystem newInstance = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
        SequenceFile.Writer sequenceFileWriter = getSequenceFileWriter(configuration, newInstance, currentTimeMillis, b);
        AtomicLong atomicLong = new AtomicLong();
        HDFSTransactionLog hDFSTransactionLog = getHDFSTransactionLog(configuration, newInstance, currentTimeMillis);
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (i4 >= i - i2) {
                break;
            }
            if (b > 1) {
                writeNumWrites(sequenceFileWriter, i2);
            }
            for (int i5 = 0; i5 < i2; i5++) {
                AbstractTransactionLog.CaskEntry caskEntry = new AbstractTransactionLog.CaskEntry(new LongWritable(atomicLong.getAndIncrement()), createRandomCaskEdits.get(i5));
                sequenceFileWriter.append(caskEntry.getKey(), caskEntry.getEdit());
            }
            sequenceFileWriter.syncFs();
            i3 = i4 + i2;
        }
        if (b > 1) {
            writeNumWrites(sequenceFileWriter, i2);
        }
        for (int i6 = i - i2; i6 < i - 1; i6++) {
            AbstractTransactionLog.CaskEntry caskEntry2 = new AbstractTransactionLog.CaskEntry(new LongWritable(atomicLong.getAndIncrement()), createRandomCaskEdits.get(i6));
            sequenceFileWriter.append(caskEntry2.getKey(), caskEntry2.getEdit());
        }
        AbstractTransactionLog.CaskEntry caskEntry3 = new AbstractTransactionLog.CaskEntry(new LongWritable(atomicLong.getAndIncrement()), createRandomCaskEdits.get(i - 1));
        if (z) {
            sequenceFileWriter.append(caskEntry3.getKey(), caskEntry3.getEdit());
        } else {
            byte[] byteArray = Longs.toByteArray(caskEntry3.getKey().get());
            sequenceFileWriter.appendRaw(byteArray, 0, byteArray.length, new SequenceFile.ValueBytes() { // from class: org.apache.tephra.persist.HDFSTransactionLogTest.1
                public void writeUncompressedBytes(DataOutputStream dataOutputStream) throws IOException {
                    dataOutputStream.write(new byte[]{2}, 0, 1);
                }

                public void writeCompressedBytes(DataOutputStream dataOutputStream) throws IllegalArgumentException, IOException {
                }

                public int getSize() {
                    return 12;
                }
            });
        }
        sequenceFileWriter.syncFs();
        Closeables.closeQuietly(sequenceFileWriter);
        int i7 = 0;
        while (hDFSTransactionLog.getReader().next() != null) {
            i7++;
        }
        if (z) {
            Assert.assertEquals(i, i7);
        } else {
            Assert.assertEquals(i - i2, i7);
        }
    }

    private void testTransactionLogSync(int i, int i2, byte b, boolean z) throws Exception {
        List<TransactionEdit> createRandomEdits = TransactionEditUtil.createRandomEdits(i);
        long currentTimeMillis = System.currentTimeMillis();
        Configuration configuration = getConfiguration();
        configuration.set("data.tx.log.slow.append.threshold", "0");
        FileSystem newInstance = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
        SequenceFile.Writer sequenceFileWriter = getSequenceFileWriter(configuration, newInstance, currentTimeMillis, b);
        AtomicLong atomicLong = new AtomicLong();
        HDFSTransactionLog hDFSTransactionLog = getHDFSTransactionLog(configuration, newInstance, currentTimeMillis);
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (i4 >= i - i2) {
                break;
            }
            writeNumWrites(sequenceFileWriter, i2);
            for (int i5 = 0; i5 < i2; i5++) {
                AbstractTransactionLog.Entry entry = new AbstractTransactionLog.Entry(new LongWritable(atomicLong.getAndIncrement()), createRandomEdits.get(i5));
                sequenceFileWriter.append(entry.getKey(), entry.getEdit());
            }
            sequenceFileWriter.syncFs();
            i3 = i4 + i2;
        }
        writeNumWrites(sequenceFileWriter, i2);
        for (int i6 = i - i2; i6 < i - 1; i6++) {
            AbstractTransactionLog.Entry entry2 = new AbstractTransactionLog.Entry(new LongWritable(atomicLong.getAndIncrement()), createRandomEdits.get(i6));
            sequenceFileWriter.append(entry2.getKey(), entry2.getEdit());
        }
        AbstractTransactionLog.Entry entry3 = new AbstractTransactionLog.Entry(new LongWritable(atomicLong.getAndIncrement()), createRandomEdits.get(i - 1));
        if (z) {
            sequenceFileWriter.append(entry3.getKey(), entry3.getEdit());
        } else {
            byte[] byteArray = Longs.toByteArray(entry3.getKey().get());
            sequenceFileWriter.appendRaw(byteArray, 0, byteArray.length, new SequenceFile.ValueBytes() { // from class: org.apache.tephra.persist.HDFSTransactionLogTest.2
                public void writeUncompressedBytes(DataOutputStream dataOutputStream) throws IOException {
                    dataOutputStream.write(new byte[]{2}, 0, 1);
                }

                public void writeCompressedBytes(DataOutputStream dataOutputStream) throws IllegalArgumentException, IOException {
                }

                public int getSize() {
                    return 12;
                }
            });
        }
        sequenceFileWriter.syncFs();
        Closeables.closeQuietly(sequenceFileWriter);
        int i7 = 0;
        while (hDFSTransactionLog.getReader().next() != null) {
            i7++;
        }
        if (z) {
            Assert.assertEquals(i, i7);
        } else {
            Assert.assertEquals(i - i2, i7);
        }
    }

    @Test
    public void testTransactionLogVersion3() throws Exception {
        testTransactionLogSync(1000, 1, (byte) 3, false);
        testTransactionLogSync(TestHCM.SleepLongerAtFirstCoprocessor.SLEEP_TIME, 5, (byte) 3, false);
        testTransactionLogSync(1000, 1, (byte) 3, true);
        testTransactionLogSync(TestHCM.SleepLongerAtFirstCoprocessor.SLEEP_TIME, 5, (byte) 3, true);
    }

    @Test
    public void testTransactionLogVersion2() throws Exception {
        testCaskTransactionLogSync(1000, 1, (byte) 2, false);
        testCaskTransactionLogSync(TestHCM.SleepLongerAtFirstCoprocessor.SLEEP_TIME, 5, (byte) 2, false);
        testCaskTransactionLogSync(1000, 1, (byte) 2, true);
        testCaskTransactionLogSync(TestHCM.SleepLongerAtFirstCoprocessor.SLEEP_TIME, 5, (byte) 2, true);
    }

    @Test
    public void testTransactionLogOldVersion() throws Exception {
        testCaskTransactionLogSync(1000, 1, (byte) 1, false);
        testCaskTransactionLogSync(TestHCM.SleepLongerAtFirstCoprocessor.SLEEP_TIME, 5, (byte) 1, true);
    }
}
