/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.DataFileReader;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.HdfsSinkTask;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.avro.AvroDataFileReader;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.WAL;
import io.confluent.connect.storage.StorageFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Test;

public class HdfsSinkTaskTest
extends TestWithMiniDFSCluster {
    private static final String DIRECTORY1 = "test-topic/partition=" + String.valueOf(12);
    private static final String DIRECTORY2 = "test-topic/partition=" + String.valueOf(13);
    private static final String extension = ".avro";
    private static final String ZERO_PAD_FMT = "%010d";
    private final DataFileReader schemaFileReader = new AvroDataFileReader();

    @Test
    public void testSinkTaskStart() throws Exception {
        this.setUp();
        this.createCommittedFiles();
        HdfsSinkTask task = new HdfsSinkTask();
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals((long)offsets.size(), (long)2L);
        Assert.assertTrue((boolean)offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals((long)21L, (long)((Long)offsets.get(TOPIC_PARTITION)));
        Assert.assertTrue((boolean)offsets.containsKey(TOPIC_PARTITION2));
        Assert.assertEquals((long)46L, (long)((Long)offsets.get(TOPIC_PARTITION2)));
        task.stop();
    }

    @Test
    public void testSinkTaskFileSystemIsolation() throws Exception {
        this.setUp();
        this.createCommittedFiles();
        String key = "key";
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        ArrayList<SinkRecord> sinkRecordsA = new ArrayList<SinkRecord>();
        ArrayList<SinkRecord> sinkRecordsB = new ArrayList<SinkRecord>();
        for (TopicPartition tp : this.context.assignment()) {
            SinkRecord sinkRecord;
            long offset;
            for (offset = 0L; offset < 7L; ++offset) {
                sinkRecord = new SinkRecord(tp.topic(), tp.partition(), Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset);
                sinkRecordsA.add(sinkRecord);
            }
            for (offset = 7L; offset < 16L; ++offset) {
                sinkRecord = new SinkRecord(tp.topic(), tp.partition(), Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset);
                sinkRecordsB.add(sinkRecord);
            }
        }
        HdfsSinkTask task = new HdfsSinkTask();
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        task.put(sinkRecordsA);
        FileSystem.get((URI)new URI(this.connectorConfig.getString("hdfs.url")), (Configuration)this.connectorConfig.getHadoopConfiguration()).close();
        task.put(sinkRecordsB);
        task.stop();
        AvroData avroData = task.getAvroData();
        long[] validOffsets = new long[]{-1L, 2L, 5L, 8L, 11L, 14L};
        for (TopicPartition tp : this.context.assignment()) {
            String directory = tp.topic() + "/partition=" + String.valueOf(tp.partition());
            for (int j = 1; j < validOffsets.length; ++j) {
                long startOffset = validOffsets[j - 1] + 1L;
                long endOffset = validOffsets[j];
                String topicsDir = (String)this.topicsDir.get(tp.topic());
                Path path = new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)tp, (long)startOffset, (long)endOffset, (String)extension, (String)ZERO_PAD_FMT));
                Collection<Object> records = this.schemaFileReader.readData(this.connectorConfig.getHadoopConfiguration(), path);
                long size = endOffset - startOffset + 1L;
                Assert.assertEquals((long)records.size(), (long)size);
                for (Object avroRecord : records) {
                    Assert.assertEquals((Object)avroRecord, (Object)avroData.fromConnectData(schema, (Object)record));
                }
            }
        }
    }

    @Test
    public void testSinkTaskStartNoCommittedFiles() throws Exception {
        this.setUp();
        HdfsSinkTask task = new HdfsSinkTask();
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals((long)0L, (long)offsets.size());
        task.stop();
    }

    @Test
    public void testSinkTaskStartSomeCommittedFiles() throws Exception {
        this.setUp();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        HashMap<TopicPartition, List<String>> tempfiles = new HashMap<TopicPartition, List<String>>();
        ArrayList<String> list1 = new ArrayList<String>();
        list1.add(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (String)extension));
        list1.add(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (String)extension));
        tempfiles.put(TOPIC_PARTITION, list1);
        HashMap<TopicPartition, List<String>> committedFiles = new HashMap<TopicPartition, List<String>>();
        ArrayList<String> list3 = new ArrayList<String>();
        list3.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION, (long)100L, (long)200L, (String)extension, (String)ZERO_PAD_FMT));
        list3.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION, (long)201L, (long)300L, (String)extension, (String)ZERO_PAD_FMT));
        committedFiles.put(TOPIC_PARTITION, list3);
        for (TopicPartition tp : tempfiles.keySet()) {
            for (String file : (List)tempfiles.get(tp)) {
                fs.createNewFile(new Path(file));
            }
        }
        this.createWALs(tempfiles, committedFiles);
        HdfsSinkTask task = new HdfsSinkTask();
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals((long)1L, (long)offsets.size());
        Assert.assertTrue((boolean)offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals((long)301L, (long)((Long)offsets.get(TOPIC_PARTITION)));
        task.stop();
    }

    @Test
    public void testSinkTaskStartWithRecovery() throws Exception {
        this.setUp();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        HashMap<TopicPartition, List<String>> tempfiles = new HashMap<TopicPartition, List<String>>();
        ArrayList<String> list1 = new ArrayList<String>();
        list1.add(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (String)extension));
        list1.add(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (String)extension));
        tempfiles.put(TOPIC_PARTITION, list1);
        topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION2.topic());
        ArrayList<String> list2 = new ArrayList<String>();
        list2.add(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)DIRECTORY2, (String)extension));
        list2.add(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)DIRECTORY2, (String)extension));
        tempfiles.put(TOPIC_PARTITION2, list2);
        topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        HashMap<TopicPartition, List<String>> committedFiles = new HashMap<TopicPartition, List<String>>();
        ArrayList<String> list3 = new ArrayList<String>();
        list3.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION, (long)100L, (long)200L, (String)extension, (String)ZERO_PAD_FMT));
        list3.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION, (long)201L, (long)300L, (String)extension, (String)ZERO_PAD_FMT));
        committedFiles.put(TOPIC_PARTITION, list3);
        topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION2.topic());
        ArrayList<String> list4 = new ArrayList<String>();
        list4.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY2, (TopicPartition)TOPIC_PARTITION2, (long)400L, (long)500L, (String)extension, (String)ZERO_PAD_FMT));
        list4.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY2, (TopicPartition)TOPIC_PARTITION2, (long)501L, (long)800L, (String)extension, (String)ZERO_PAD_FMT));
        committedFiles.put(TOPIC_PARTITION2, list4);
        for (TopicPartition tp : tempfiles.keySet()) {
            for (String file : (List)tempfiles.get(tp)) {
                fs.createNewFile(new Path(file));
            }
        }
        this.createWALs(tempfiles, committedFiles);
        HdfsSinkTask task = new HdfsSinkTask();
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals((long)2L, (long)offsets.size());
        Assert.assertTrue((boolean)offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals((long)301L, (long)((Long)offsets.get(TOPIC_PARTITION)));
        Assert.assertTrue((boolean)offsets.containsKey(TOPIC_PARTITION2));
        Assert.assertEquals((long)801L, (long)((Long)offsets.get(TOPIC_PARTITION2)));
        task.stop();
    }

    @Test
    public void testSinkTaskPut() throws Exception {
        this.setUp();
        HdfsSinkTask task = new HdfsSinkTask();
        String key = "key";
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        for (TopicPartition tp : this.context.assignment()) {
            for (long offset = 0L; offset < 7L; ++offset) {
                SinkRecord sinkRecord = new SinkRecord(tp.topic(), tp.partition(), Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset);
                sinkRecords.add(sinkRecord);
            }
        }
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        task.put(sinkRecords);
        task.stop();
        AvroData avroData = task.getAvroData();
        long[] validOffsets = new long[]{-1L, 2L, 5L};
        for (TopicPartition tp : this.context.assignment()) {
            String directory = tp.topic() + "/partition=" + String.valueOf(tp.partition());
            for (int j = 1; j < validOffsets.length; ++j) {
                long startOffset = validOffsets[j - 1] + 1L;
                long endOffset = validOffsets[j];
                String topicsDir = (String)this.topicsDir.get(tp.topic());
                Path path = new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)tp, (long)startOffset, (long)endOffset, (String)extension, (String)ZERO_PAD_FMT));
                Collection<Object> records = this.schemaFileReader.readData(this.connectorConfig.getHadoopConfiguration(), path);
                long size = endOffset - startOffset + 1L;
                Assert.assertEquals((long)records.size(), (long)size);
                for (Object avroRecord : records) {
                    Assert.assertEquals((Object)avroRecord, (Object)avroData.fromConnectData(schema, (Object)record));
                }
            }
        }
    }

    @Test
    public void testSinkTaskPutPrimitive() throws Exception {
        this.setUp();
        HdfsSinkTask task = new HdfsSinkTask();
        String key = "key";
        Schema schema = Schema.INT32_SCHEMA;
        int record = 12;
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        for (TopicPartition tp : this.context.assignment()) {
            for (long offset = 0L; offset < 7L; ++offset) {
                SinkRecord sinkRecord = new SinkRecord(tp.topic(), tp.partition(), Schema.STRING_SCHEMA, (Object)"key", schema, (Object)12, offset);
                sinkRecords.add(sinkRecord);
            }
        }
        task.initialize((SinkTaskContext)this.context);
        task.start(this.properties);
        task.put(sinkRecords);
        task.stop();
        long[] validOffsets = new long[]{-1L, 2L, 5L};
        for (TopicPartition tp : this.context.assignment()) {
            String directory = tp.topic() + "/partition=" + String.valueOf(tp.partition());
            for (int j = 1; j < validOffsets.length; ++j) {
                long startOffset = validOffsets[j - 1] + 1L;
                long endOffset = validOffsets[j];
                String topicsDir = (String)this.topicsDir.get(tp.topic());
                Path path = new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)tp, (long)startOffset, (long)endOffset, (String)extension, (String)ZERO_PAD_FMT));
                Collection<Object> records = this.schemaFileReader.readData(this.connectorConfig.getHadoopConfiguration(), path);
                long size = endOffset - startOffset + 1L;
                Assert.assertEquals((long)records.size(), (long)size);
                for (Object avroRecord : records) {
                    Assert.assertEquals((Object)avroRecord, (Object)12);
                }
            }
        }
    }

    private void createCommittedFiles() throws IOException {
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        String file1 = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION, (long)0L, (long)10L, (String)extension, (String)ZERO_PAD_FMT);
        String file2 = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION, (long)11L, (long)20L, (String)extension, (String)ZERO_PAD_FMT);
        String file3 = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION2, (long)21L, (long)40L, (String)extension, (String)ZERO_PAD_FMT);
        String file4 = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)DIRECTORY1, (TopicPartition)TOPIC_PARTITION2, (long)41L, (long)45L, (String)extension, (String)ZERO_PAD_FMT);
        fs.createNewFile(new Path(file1));
        fs.createNewFile(new Path(file2));
        fs.createNewFile(new Path(file3));
        fs.createNewFile(new Path(file4));
    }

    private void createWALs(Map<TopicPartition, List<String>> tempfiles, Map<TopicPartition, List<String>> committedFiles) throws Exception {
        Class storageClass = this.connectorConfig.getClass("storage.class");
        HdfsStorage storage = (HdfsStorage)StorageFactory.createStorage((Class)storageClass, HdfsSinkConnectorConfig.class, (Object)this.connectorConfig, (String)this.url);
        for (TopicPartition tp : tempfiles.keySet()) {
            WAL wal = storage.wal(this.logsDir, tp);
            List<String> tempList = tempfiles.get(tp);
            List<String> committedList = committedFiles.get(tp);
            wal.append("BEGIN", "");
            for (int i = 0; i < tempList.size(); ++i) {
                wal.append(tempList.get(i), committedList.get(i));
            }
            wal.append("END", "");
            wal.close();
        }
    }
}

