/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.wal.CorruptWalFileException;
import io.confluent.connect.hdfs.wal.WALEntry;
import io.confluent.connect.hdfs.wal.WALFile;
import java.io.IOException;
import java.rmi.server.UID;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Time;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;

public class WALFileTest
extends TestWithMiniDFSCluster {
    @Test
    public void testAppend() throws Exception {
        this.setUp();
        this.properties.put("topic.capture.groups.regex", "(.*)");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(this.properties);
        String topic = "topic";
        String topicsDir = connectorConfig.getTopicsDirFromTopic(topic);
        int partition = 0;
        TopicPartition topicPart = new TopicPartition(topic, partition);
        Path file = new Path(FileUtils.logFileName((String)this.url, (String)topicsDir, (TopicPartition)topicPart));
        WALFile.Writer writer = WALFile.createWriter((HdfsSinkConnectorConfig)connectorConfig, (WALFile.Writer.Option[])new WALFile.Writer.Option[]{WALFile.Writer.file((Path)file)});
        WALEntry key1 = new WALEntry("key1");
        WALEntry val1 = new WALEntry("val1");
        WALEntry key2 = new WALEntry("key2");
        WALEntry val2 = new WALEntry("val2");
        writer.append(key1, val1);
        writer.append(key2, val2);
        writer.close();
        this.verify2Values(file);
        writer = WALFile.createWriter((HdfsSinkConnectorConfig)connectorConfig, (WALFile.Writer.Option[])new WALFile.Writer.Option[]{WALFile.Writer.file((Path)file), WALFile.Writer.appendIfExists((boolean)true)});
        WALEntry key3 = new WALEntry("key3");
        WALEntry val3 = new WALEntry("val3");
        WALEntry key4 = new WALEntry("key4");
        WALEntry val4 = new WALEntry("val4");
        writer.append(key3, val3);
        writer.append(key4, val4);
        writer.hsync();
        writer.close();
        this.verifyAll4Values(file);
        fs.deleteOnExit(file);
    }

    private void verify2Values(Path file) throws IOException {
        WALEntry key1 = new WALEntry("key1");
        WALEntry val1 = new WALEntry("val1");
        WALEntry key2 = new WALEntry("key2");
        WALEntry val2 = new WALEntry("val2");
        WALFile.Reader reader = new WALFile.Reader(this.conf, new WALFile.Reader.Option[]{WALFile.Reader.file((Path)file)});
        Assert.assertEquals((Object)key1.getName(), (Object)reader.next((WALEntry)null).getName());
        Assert.assertEquals((Object)val1.getName(), (Object)reader.getCurrentValue(null).getName());
        Assert.assertEquals((Object)key2.getName(), (Object)reader.next((WALEntry)null).getName());
        Assert.assertEquals((Object)val2.getName(), (Object)reader.getCurrentValue(null).getName());
        Assert.assertNull((Object)reader.next((WALEntry)null));
        reader.close();
    }

    private void verifyAll4Values(Path file) throws IOException {
        WALEntry key1 = new WALEntry("key1");
        WALEntry val1 = new WALEntry("val1");
        WALEntry key2 = new WALEntry("key2");
        WALEntry val2 = new WALEntry("val2");
        WALEntry key3 = new WALEntry("key3");
        WALEntry val3 = new WALEntry("val3");
        WALEntry key4 = new WALEntry("key4");
        WALEntry val4 = new WALEntry("val4");
        WALFile.Reader reader = new WALFile.Reader(this.conf, new WALFile.Reader.Option[]{WALFile.Reader.file((Path)file)});
        Assert.assertEquals((Object)key1.getName(), (Object)reader.next((WALEntry)null).getName());
        Assert.assertEquals((Object)val1.getName(), (Object)reader.getCurrentValue(null).getName());
        Assert.assertEquals((Object)key2.getName(), (Object)reader.next((WALEntry)null).getName());
        Assert.assertEquals((Object)val2.getName(), (Object)reader.getCurrentValue(null).getName());
        Assert.assertEquals((Object)key3.getName(), (Object)reader.next((WALEntry)null).getName());
        Assert.assertEquals((Object)val3.getName(), (Object)reader.getCurrentValue(null).getName());
        Assert.assertEquals((Object)key4.getName(), (Object)reader.next((WALEntry)null).getName());
        Assert.assertEquals((Object)val4.getName(), (Object)reader.getCurrentValue(null).getName());
        Assert.assertNull((Object)reader.next((WALEntry)null));
        reader.close();
    }

    @Test
    public void testCorruptReadDoesThrowException() throws Exception {
        this.setUp();
        this.properties.put("topic.capture.groups.regex", "(.*)");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(this.properties);
        String topic = "topic";
        String topicsDir = connectorConfig.getTopicsDirFromTopic(topic);
        int partition = 0;
        TopicPartition topicPart = new TopicPartition(topic, partition);
        Path file = new Path(FileUtils.logFileName((String)this.url, (String)topicsDir, (TopicPartition)topicPart));
        CorruptWriter writer = new CorruptWriter(connectorConfig, WALFile.Writer.file((Path)file));
        for (int i = 0; i < 350; ++i) {
            writer.append(new WALEntry("key"), new WALEntry("val"));
        }
        writer.close();
        try {
            this.readAllValues(file);
        }
        catch (CorruptWalFileException e) {
            fs.deleteOnExit(file);
            return;
        }
        fs.deleteOnExit(file);
        throw new Exception("should have thrown CorruptWALFileException");
    }

    private void readAllValues(Path file) throws IOException {
        WALFile.Reader reader = new WALFile.Reader(this.conf, new WALFile.Reader.Option[]{WALFile.Reader.file((Path)file)});
        WALEntry key = new WALEntry();
        WALEntry value = new WALEntry();
        while (reader.next((Writable)key, (Writable)value)) {
        }
    }

    @Test
    public void testHdfsIsDown() throws Exception {
        this.setUp();
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(this.properties);
        String topicsDir = connectorConfig.getString("topics.dir");
        String topic = "topic";
        int partition = 0;
        TopicPartition topicPart = new TopicPartition(topic, partition);
        Path file = new Path(FileUtils.logFileName((String)this.url, (String)topicsDir, (TopicPartition)topicPart));
        cluster.shutdown();
        int fileSystemCacheSizeBefore = this.getFileSystemCacheSize();
        try {
            WALFile.createWriter((HdfsSinkConnectorConfig)connectorConfig, (WALFile.Writer.Option[])new WALFile.Writer.Option[]{WALFile.Writer.file((Path)file)});
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)fileSystemCacheSizeBefore, (long)this.getFileSystemCacheSize());
    }

    public static class CorruptWriter
    extends WALFile.Writer {
        public CorruptWriter(HdfsSinkConnectorConfig connectorConfig, WALFile.Writer.Option ... opts) throws IOException {
            super(connectorConfig, opts);
        }

        public void changeSync() {
            MessageDigest digester;
            try {
                digester = MessageDigest.getInstance("MD5");
            }
            catch (NoSuchAlgorithmException ex) {
                return;
            }
            long time = Time.now();
            digester.update((new UID() + "@" + time).getBytes(Charsets.UTF_8));
            this.sync = digester.digest();
        }

        public synchronized void append(WALEntry key, WALEntry val) throws IOException {
            super.append(key, val);
            this.changeSync();
        }
    }
}

