/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs.avro;

import io.confluent.common.utils.MockTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.TopicPartitionWriterTest;
import io.confluent.connect.hdfs.avro.AvroDataFileReader;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.FSWAL;
import io.confluent.connect.hdfs.wal.WAL;
import io.confluent.connect.hdfs.wal.WALFile;
import io.confluent.connect.hdfs.wal.WALFileTest;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataWriterAvroTest
extends TestWithMiniDFSCluster {
    private static final Logger log = LoggerFactory.getLogger(DataWriterAvroTest.class);

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.dataFileReader = new AvroDataFileReader();
        this.extension = ".avro";
    }

    @Test
    public void testWriteRecord() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecords(7);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets);
    }

    @Test
    public void testRecovery() throws Exception {
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        fs.delete(new Path(FileUtils.directoryName((String)this.url, (String)topicsDir, (TopicPartition)TOPIC_PARTITION)), true);
        HdfsStorage storage = new HdfsStorage(this.connectorConfig, this.url);
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        WAL wal = storage.wal(this.logsDir, TOPIC_PARTITION);
        wal.append("BEGIN", "");
        for (int i = 0; i < 5; ++i) {
            long startOffset = i * 10;
            long endOffset = (i + 1) * 10 - 1;
            String tempfile = FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)this.getDirectory(), (String)this.extension);
            fs.createNewFile(new Path(tempfile));
            String committedFile = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)this.getDirectory(), (TopicPartition)TOPIC_PARTITION, (long)startOffset, (long)endOffset, (String)this.extension, (String)this.zeroPadFormat);
            wal.append(tempfile, committedFile);
        }
        wal.append("END", "");
        wal.close();
        hdfsWriter.recover(TOPIC_PARTITION);
        Map offsets = this.context.offsets();
        Assert.assertTrue((boolean)offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals((long)50L, (long)((Long)offsets.get(TOPIC_PARTITION)));
        List<SinkRecord> sinkRecords = this.createSinkRecords(3, 50L);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 10L, 20L, 30L, 40L, 50L, 53L};
        this.verifyFileListing(validOffsets, Collections.singleton(new TopicPartition("test-topic", 12)));
    }

    @Test
    public void testCorruptRecovery() throws Exception {
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        fs.delete(new Path(FileUtils.directoryName((String)this.url, (String)topicsDir, (TopicPartition)TOPIC_PARTITION)), true);
        final HdfsStorage storage = new HdfsStorage(this.connectorConfig, this.url);
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        FSWAL wal = new FSWAL(this.logsDir, TOPIC_PARTITION, storage){

            public void acquireLease() throws ConnectException {
                super.acquireLease();
                if (this.writer.getClass() != WALFileTest.CorruptWriter.class) {
                    try {
                        this.writer = new WALFileTest.CorruptWriter(storage.conf(), WALFile.Writer.file((Path)new Path(this.getLogFile())), WALFile.Writer.appendIfExists((boolean)true));
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        wal.append("BEGIN", "");
        for (int i = 0; i < 20; ++i) {
            long startOffset = i * 10;
            long endOffset = (i + 1) * 10 - 1;
            String tempfile = FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)this.getDirectory(), (String)this.extension);
            fs.createNewFile(new Path(tempfile));
            String committedFile = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)this.getDirectory(), (TopicPartition)TOPIC_PARTITION, (long)startOffset, (long)endOffset, (String)this.extension, (String)this.zeroPadFormat);
            wal.append(tempfile, committedFile);
        }
        wal.append("END", "");
        wal.close();
        hdfsWriter.recover(TOPIC_PARTITION);
        Map offsets = this.context.offsets();
        Assert.assertFalse((boolean)offsets.containsKey(TOPIC_PARTITION));
    }

    @Test
    public void testWriteRecordMultiplePartitions() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        for (TopicPartition tp : this.context.assignment()) {
            hdfsWriter.recover(tp);
        }
        List<SinkRecord> sinkRecords = this.createSinkRecords(7, 0L, this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets, this.context.assignment());
    }

    @Test
    public void testWriteInterleavedRecordsInMultiplePartitions() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        for (TopicPartition tp : this.context.assignment()) {
            hdfsWriter.recover(tp);
        }
        List<SinkRecord> sinkRecords = this.createSinkRecordsInterleaved(7 * this.context.assignment().size(), 0L, this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets, this.context.assignment());
    }

    @Test
    public void testWriteInterleavedRecordsInMultiplePartitionsNonZeroInitialOffset() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        List<SinkRecord> sinkRecords = this.createSinkRecordsInterleaved(7 * this.context.assignment().size(), 9L, this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{9L, 12L, 15L};
        this.verify(sinkRecords, validOffsets, this.context.assignment());
    }

    @Test
    public void testGetNextOffsets() throws Exception {
        String directory = "test-topic/partition=" + String.valueOf(12);
        String topicsDir = (String)this.topicsDir.get("test-topic");
        long[] startOffsets = new long[]{0L, 3L};
        long[] endOffsets = new long[]{2L, 5L};
        for (int i = 0; i < startOffsets.length; ++i) {
            Path path = new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)TOPIC_PARTITION, (long)startOffsets[i], (long)endOffsets[i], (String)this.extension, (String)this.zeroPadFormat));
            fs.createNewFile(path);
        }
        Path path = new Path(FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)directory, (String)this.extension));
        fs.createNewFile(path);
        path = new Path(FileUtils.fileName((String)this.url, (String)topicsDir, (String)directory, (String)"abcd"));
        fs.createNewFile(path);
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        hdfsWriter.recover(TOPIC_PARTITION);
        Map committedOffsets = hdfsWriter.getCommittedOffsets();
        Assert.assertTrue((boolean)committedOffsets.containsKey(TOPIC_PARTITION));
        long nextOffset = (Long)committedOffsets.get(TOPIC_PARTITION);
        Assert.assertEquals((long)6L, (long)nextOffset);
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    @Test
    public void testWriteRecordNonZeroInitialOffset() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecords(7, 3L);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{3L, 6L, 9L};
        this.verify(sinkRecords, validOffsets);
    }

    @Test
    public void testRebalance() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        HashSet<TopicPartition> originalAssignment = new HashSet<TopicPartition>(this.context.assignment());
        for (TopicPartition tp : originalAssignment) {
            hdfsWriter.recover(tp);
        }
        HashSet<TopicPartition> nextAssignment = new HashSet<TopicPartition>();
        nextAssignment.add(TOPIC_PARTITION);
        nextAssignment.add(TOPIC_PARTITION3);
        List<SinkRecord> sinkRecords = this.createSinkRecords(7, 0L, originalAssignment);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        this.context.setAssignment(nextAssignment);
        hdfsWriter.open(nextAssignment);
        Assert.assertEquals(null, (Object)hdfsWriter.getBucketWriter(TOPIC_PARTITION2));
        Assert.assertNotNull((Object)hdfsWriter.getBucketWriter(TOPIC_PARTITION));
        Assert.assertNotNull((Object)hdfsWriter.getBucketWriter(TOPIC_PARTITION3));
        long[] validOffsetsTopicPartition2 = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsetsTopicPartition2, Collections.singleton(TOPIC_PARTITION2), true);
        sinkRecords = this.createSinkRecords(3, 6L, this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsetsTopicPartition1 = new long[]{6L, 9L};
        this.verify(sinkRecords, validOffsetsTopicPartition1, Collections.singleton(TOPIC_PARTITION), true);
        long[] validOffsetsTopicPartition3 = new long[]{6L, 9L};
        this.verify(sinkRecords, validOffsetsTopicPartition3, Collections.singleton(TOPIC_PARTITION3), true);
    }

    @Test
    public void testProjectBackWard() throws Exception {
        Map<String, String> props = this.createProps();
        props.put("flush.size", "2");
        props.put("schema.compatibility", "BACKWARD");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecordsWithAlternatingSchemas(7, 0L);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 1L, 3L, 5L, 7L};
        this.verify(sinkRecords, validOffsets);
    }

    @Test
    public void testProjectNone() throws Exception {
        Map<String, String> props = this.createProps();
        props.put("flush.size", "2");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecordsWithAlternatingSchemas(7, 0L);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 1L, 2L, 3L, 4L, 5L, 6L};
        this.verify(sinkRecords, validOffsets);
    }

    @Test
    public void testProjectForward() throws Exception {
        Map<String, String> props = this.createProps();
        props.put("flush.size", "2");
        props.put("schema.compatibility", "FORWARD");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecordsWithAlternatingSchemas(8, 0L).subList(1, 8);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{1L, 2L, 4L, 6L, 8L};
        this.verify(sinkRecords, validOffsets);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProjectNoVersion() throws Exception {
        Map<String, String> props = this.createProps();
        props.put("schema.compatibility", "BACKWARD");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecordsNoVersion(1, 0L);
        sinkRecords.addAll(this.createSinkRecordsWithAlternatingSchemas(7, 0L));
        try {
            hdfsWriter.write(sinkRecords);
            Assert.fail((String)"Version is required for Backward compatibility.");
        }
        catch (RuntimeException validOffsets) {
        }
        finally {
            hdfsWriter.close();
            hdfsWriter.stop();
            long[] validOffsets = new long[]{};
            this.verify(Collections.emptyList(), validOffsets);
        }
    }

    @Test
    public void testFlushPartialFile() throws Exception {
        String ROTATE_INTERVAL_MS_CONFIG = "1000";
        long WAIT_TIME = Long.valueOf(ROTATE_INTERVAL_MS_CONFIG) * 2L;
        String FLUSH_SIZE_CONFIG = "10";
        int NUMBER_OF_RECORDS = Integer.valueOf(FLUSH_SIZE_CONFIG) + Integer.valueOf(FLUSH_SIZE_CONFIG) / 2;
        Map<String, String> props = this.createProps();
        props.put("flush.size", FLUSH_SIZE_CONFIG);
        props.put("rotate.interval.ms", ROTATE_INTERVAL_MS_CONFIG);
        props.put("partition.duration.ms", String.valueOf(TimeUnit.DAYS.toMillis(1L)));
        props.put("timestamp.extractor", TopicPartitionWriterTest.MockedWallclockTimestampExtractor.class.getName());
        props.put("partitioner.class", TimeBasedPartitioner.class.getName());
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        this.context.assignment().add(TOPIC_PARTITION);
        MockTime time = TopicPartitionWriterTest.MockedWallclockTimestampExtractor.TIME;
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData, (Time)time);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecords(NUMBER_OF_RECORDS);
        hdfsWriter.write(sinkRecords);
        time.sleep(WAIT_TIME);
        hdfsWriter.write(new ArrayList());
        Map committedOffsets = hdfsWriter.getCommittedOffsets();
        Assert.assertTrue((boolean)committedOffsets.containsKey(TOPIC_PARTITION));
        long nextOffset = (Long)committedOffsets.get(TOPIC_PARTITION);
        Assert.assertEquals((long)NUMBER_OF_RECORDS, (long)nextOffset);
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    @Test
    public void testAvroCompression() throws Exception {
        Map<String, String> props = this.createProps();
        props.put("avro.codec", "snappy");
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecords(7);
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets);
        List<String> filenames = this.getExpectedFiles(validOffsets, TOPIC_PARTITION);
        for (String filename : filenames) {
            Path p = new Path(filename);
            FSDataInputStream stream = fs.open(p);
            Throwable throwable = null;
            try {
                String fileContents;
                int index;
                int size = (int)fs.getFileStatus(p).getLen();
                ByteBuffer buffer = ByteBuffer.allocate(size);
                if (stream.read(buffer) <= 0) {
                    log.error("Could not read file {}", (Object)filename);
                }
                Assert.assertTrue(((index = (fileContents = new String(buffer.array())).indexOf("avro.codec")) > 0 && fileContents.indexOf("snappy", index) > 0 ? 1 : 0) != 0);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (stream == null) continue;
                if (throwable != null) {
                    try {
                        stream.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                stream.close();
            }
        }
    }
}

