/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.common.utils.MockTime;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.RecordWriterProvider;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.TopicPartitionWriter;
import io.confluent.connect.hdfs.avro.AvroDataFileReader;
import io.confluent.connect.hdfs.filter.CommittedFileFilter;
import io.confluent.connect.hdfs.partitioner.DefaultPartitioner;
import io.confluent.connect.hdfs.partitioner.FieldPartitioner;
import io.confluent.connect.hdfs.partitioner.Partitioner;
import io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner;
import io.confluent.connect.hdfs.partitioner.TimeUtils;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.storage.Storage;
import io.confluent.connect.hdfs.wal.FSWAL;
import io.confluent.connect.hdfs.wal.WALFile;
import io.confluent.connect.hdfs.wal.WALFileTest;
import io.confluent.connect.storage.StorageFactory;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
import io.confluent.connect.storage.partitioner.HourlyPartitioner;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Test;

public class TopicPartitionWriterTest
extends TestWithMiniDFSCluster {
    private RecordWriterProvider writerProvider = null;
    private io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig> newWriterProvider;
    private HdfsStorage storage;
    private Map<String, String> localProps = new HashMap<String, String>();
    private MockTime time;

    @Override
    protected Map<String, String> createProps() {
        Map<String, String> props = super.createProps();
        props.putAll(this.localProps);
        return props;
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.time = new MockTime();
        Class storageClass = this.connectorConfig.getClass("storage.class");
        this.storage = (HdfsStorage)StorageFactory.createStorage((Class)storageClass, HdfsSinkConnectorConfig.class, (Object)this.connectorConfig, (String)this.url);
        Class formatClass = this.connectorConfig.getClass("format.class");
        Format format = (Format)formatClass.getConstructor(HdfsStorage.class).newInstance(this.storage);
        this.writerProvider = null;
        this.newWriterProvider = format.getRecordWriterProvider();
        this.dataFileReader = new AvroDataFileReader();
        this.extension = this.newWriterProvider.getExtension();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        this.createTopicDir(this.url, topicsDir, "test-topic");
        this.createLogsDir(this.url, this.logsDir);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testVariablyIncreasingOffsets() throws Exception {
        void var12_18;
        this.setUp();
        FieldPartitioner partitioner = new FieldPartitioner();
        partitioner.configure(this.parsedConfig);
        List partitionFields = (List)this.parsedConfig.get("partition.field.name");
        String partitionField = (String)partitionFields.get(0);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, (Partitioner)partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        String key = "key";
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        Schema schema = this.createSchema();
        ArrayList<Struct> records = new ArrayList<Struct>();
        int offset = 0;
        for (int i = 0; i < 3; ++i) {
            for (int j = 0; j < 3; ++j) {
                Struct struct = this.createRecord(schema, j, 12.2f);
                records.add(struct);
                sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schema, (Object)struct, (long)offset));
                offset += 10;
            }
        }
        Struct struct = this.createRecord(schema);
        sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schema, (Object)struct, (long)offset));
        records.add(struct);
        Assert.assertEquals((long)10L, (long)records.size());
        for (SinkRecord sinkRecord : sinkRecords) {
            topicPartitionWriter.buffer(sinkRecord);
        }
        Assert.assertEquals((long)-1L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.recover();
        Assert.assertEquals((long)-1L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.write();
        Assert.assertEquals((long)81L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.close();
        Assert.assertEquals((long)81L, (long)topicPartitionWriter.offset());
        HashSet<Path> expectedFiles = new HashSet<Path>();
        boolean bl = false;
        while (var12_18 < records.size() - 1) {
            String directory = partitioner.generatePartitionedPath("test-topic", partitionField + "=" + ((Struct)records.get((int)var12_18)).get("int"));
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)((String)this.topicsDir.get("test-topic")), (String)directory, (TopicPartition)TOPIC_PARTITION, (long)((long)(var12_18 * 10)), (long)((long)(var12_18 * 10)), (String)this.extension, (String)this.zeroPadFormat)));
            ++var12_18;
        }
        records.sort(Comparator.comparingInt(s -> (Integer)s.get("int")));
        int n = 1;
        this.verify(expectedFiles, n, records, schema);
        topicPartitionWriter.recover();
        Assert.assertEquals((long)81L, (long)topicPartitionWriter.offset());
    }

    @Test
    public void testWriteRecordDefaultWithPadding() throws Exception {
        this.localProps.put("filename.offset.zero.pad.width", "2");
        this.setUp();
        DefaultPartitioner partitioner = new DefaultPartitioner();
        partitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, (Partitioner)partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchema();
        List<Struct> records = this.createRecordBatches(schema, 3, 3);
        records.add(this.createRecord(schema));
        List<SinkRecord> sinkRecords = this.createSinkRecords(records, schema);
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.recover();
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        HashSet<Path> expectedFiles = new HashSet<Path>();
        String topicsDir = (String)this.topicsDir.get("test-topic");
        expectedFiles.add(new Path(this.url + "/" + topicsDir + "/" + "test-topic" + "/partition=" + 12 + "/" + "test-topic" + "+" + 12 + "+00+02" + this.extension));
        expectedFiles.add(new Path(this.url + "/" + topicsDir + "/" + "test-topic" + "/partition=" + 12 + "/" + "test-topic" + "+" + 12 + "+03+05" + this.extension));
        expectedFiles.add(new Path(this.url + "/" + topicsDir + "/" + "test-topic" + "/partition=" + 12 + "/" + "test-topic" + "+" + 12 + "+06+08" + this.extension));
        int expectedBatchSize = 3;
        this.verify(expectedFiles, expectedBatchSize, records, schema);
    }

    @Test
    public void testWriteRecordDefaultWithPaddingCorruptRecovery() throws Exception {
        this.localProps.put("filename.offset.zero.pad.width", "2");
        this.setUp();
        DefaultPartitioner partitioner = new DefaultPartitioner();
        partitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, (Partitioner)partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        FSWAL wal = new FSWAL(this.logsDir, TOPIC_PARTITION, this.storage){

            public void acquireLease() throws ConnectException {
                super.acquireLease();
                if (this.writer.getClass() != WALFileTest.CorruptWriter.class) {
                    try {
                        this.writer = new WALFileTest.CorruptWriter(TopicPartitionWriterTest.this.storage.conf(), WALFile.Writer.file((Path)new Path(this.getLogFile())), WALFile.Writer.appendIfExists((boolean)true));
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        wal.append("BEGIN", "");
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        for (int i = 0; i < 20; ++i) {
            long startOffset = i * 10;
            long endOffset = (i + 1) * 10 - 1;
            String tempfile = FileUtils.tempFileName((String)this.url, (String)topicsDir, (String)partitioner.generatePartitionedPath("test-topic", "partition=12"), (String)this.extension);
            fs.createNewFile(new Path(tempfile));
            String committedFile = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)partitioner.generatePartitionedPath("test-topic", "partition=12"), (TopicPartition)TOPIC_PARTITION, (long)startOffset, (long)endOffset, (String)this.extension, (String)this.zeroPadFormat);
            wal.append(tempfile, committedFile);
        }
        wal.append("END", "");
        wal.close();
        topicPartitionWriter.recover();
        Schema schema = this.createSchema();
        List<Struct> records = this.createRecordBatches(schema, 3, 3);
        records.add(this.createRecord(schema));
        List<SinkRecord> sinkRecords = this.createSinkRecords(records, schema);
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        HashSet<Path> expectedFiles = new HashSet<Path>();
        expectedFiles.add(new Path(this.url + "/" + topicsDir + "/" + "test-topic" + "/partition=" + 12 + "/" + "test-topic" + "+" + 12 + "+00+02" + this.extension));
        expectedFiles.add(new Path(this.url + "/" + topicsDir + "/" + "test-topic" + "/partition=" + 12 + "/" + "test-topic" + "+" + 12 + "+03+05" + this.extension));
        expectedFiles.add(new Path(this.url + "/" + topicsDir + "/" + "test-topic" + "/partition=" + 12 + "/" + "test-topic" + "+" + 12 + "+06+08" + this.extension));
        int expectedBatchSize = 3;
        this.verify(expectedFiles, expectedBatchSize, records, schema);
    }

    @Test
    public void testCloseMultipleTempFiles() throws Exception {
        this.setUp();
        FieldPartitioner partitioner = new FieldPartitioner();
        partitioner.configure(this.parsedConfig);
        this.properties.put("flush.size", "10");
        this.connectorConfig = new HdfsSinkConnectorConfig(this.properties);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, (Partitioner)partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchema();
        ArrayList<Struct> records = new ArrayList<Struct>();
        for (int i = 16; i < 19; ++i) {
            for (int j = 0; j < 2; ++j) {
                records.add(this.createRecord(schema, i, 12.2f));
            }
        }
        List<SinkRecord> sinkRecords = this.createSinkRecords(records, schema);
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        Assert.assertEquals((long)-1L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.write();
        Assert.assertEquals((long)0L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.close();
    }

    @Test
    public void testWriteRecordFieldPartitioner() throws Exception {
        this.setUp();
        FieldPartitioner partitioner = new FieldPartitioner();
        partitioner.configure(this.parsedConfig);
        List partitionFields = (List)this.parsedConfig.get("partition.field.name");
        String partitionField = (String)partitionFields.get(0);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, (Partitioner)partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchema();
        ArrayList<Struct> records = new ArrayList<Struct>();
        for (int i = 16; i < 19; ++i) {
            for (int j = 0; j < 3; ++j) {
                records.add(this.createRecord(schema, i, 12.2f));
            }
        }
        records.add(this.createRecord(schema));
        Assert.assertEquals((long)10L, (long)records.size());
        List<SinkRecord> sinkRecords = this.createSinkRecords(records, schema);
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        Assert.assertEquals((long)-1L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.recover();
        Assert.assertEquals((long)-1L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.write();
        Assert.assertEquals((long)9L, (long)topicPartitionWriter.offset());
        topicPartitionWriter.close();
        Assert.assertEquals((long)9L, (long)topicPartitionWriter.offset());
        String directory1 = partitioner.generatePartitionedPath("test-topic", partitionField + "=" + String.valueOf(16));
        String directory2 = partitioner.generatePartitionedPath("test-topic", partitionField + "=" + String.valueOf(17));
        String directory3 = partitioner.generatePartitionedPath("test-topic", partitionField + "=" + String.valueOf(18));
        HashSet<Path> expectedFiles = new HashSet<Path>();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory1, (TopicPartition)TOPIC_PARTITION, (long)0L, (long)2L, (String)this.extension, (String)this.zeroPadFormat)));
        expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory2, (TopicPartition)TOPIC_PARTITION, (long)3L, (long)5L, (String)this.extension, (String)this.zeroPadFormat)));
        expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory3, (TopicPartition)TOPIC_PARTITION, (long)6L, (long)8L, (String)this.extension, (String)this.zeroPadFormat)));
        int expectedBatchSize = 3;
        this.verify(expectedFiles, expectedBatchSize, records, schema);
        topicPartitionWriter.recover();
        Assert.assertEquals((long)9L, (long)topicPartitionWriter.offset());
    }

    @Test
    public void testWriteRecordTimeBasedPartition() throws Exception {
        this.setUp();
        TimeBasedPartitioner partitioner = new TimeBasedPartitioner();
        partitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, (Partitioner)partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchema();
        List<Struct> records = this.createRecordBatches(schema, 3, 3);
        records.add(this.createRecord(schema));
        List<SinkRecord> sinkRecords = this.createSinkRecords(records, schema);
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.recover();
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        long partitionDurationMs = (Long)this.parsedConfig.get("partition.duration.ms");
        String pathFormat = (String)this.parsedConfig.get("path.format");
        String timeZoneString = (String)this.parsedConfig.get("timezone");
        long timestamp = System.currentTimeMillis();
        String encodedPartition = TimeUtils.encodeTimestamp(partitionDurationMs, pathFormat, timeZoneString, timestamp);
        String directory = partitioner.generatePartitionedPath("test-topic", encodedPartition);
        HashSet<Path> expectedFiles = new HashSet<Path>();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)TOPIC_PARTITION, (long)0L, (long)2L, (String)this.extension, (String)this.zeroPadFormat)));
        expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)TOPIC_PARTITION, (long)3L, (long)5L, (String)this.extension, (String)this.zeroPadFormat)));
        expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory, (TopicPartition)TOPIC_PARTITION, (long)6L, (long)8L, (String)this.extension, (String)this.zeroPadFormat)));
        int expectedBatchSize = 3;
        this.verify(expectedFiles, expectedBatchSize, records, schema);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        this.setUp();
        this.partitioner = new DataWriter.PartitionerWrapper((io.confluent.connect.storage.partitioner.Partitioner)new HourlyPartitioner());
        this.parsedConfig.put("timestamp.extractor", "RecordField");
        this.partitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchemaWithTimestampField();
        DateTime first = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID((String)"America/Los_Angeles"));
        long advanceMs = 20000L;
        long timestampFirst = first.getMillis();
        int size = 18;
        ArrayList<Struct> records = new ArrayList<Struct>(size);
        for (int i = 0; i < size / 2; ++i) {
            records.add(this.createRecordWithTimestampField(schema, timestampFirst));
            timestampFirst += advanceMs;
        }
        List<SinkRecord> sinkRecords = this.createSinkRecords(records.subList(0, 9), schema);
        long timestampLater = first.plusHours(2).getMillis();
        for (int i = size / 2; i < size; ++i) {
            records.add(this.createRecordWithTimestampField(schema, timestampLater));
            timestampLater += advanceMs;
        }
        sinkRecords.addAll(this.createSinkRecords(records.subList(9, 18), schema, 9L, Collections.singleton(new TopicPartition("test-topic", 12))));
        long timestampMuchLater = first.plusHours(6).getMillis();
        Struct lastOne = this.createRecordWithTimestampField(schema, timestampMuchLater);
        sinkRecords.addAll(this.createSinkRecords(Collections.singletonList(lastOne), schema, 19L, Collections.singleton(new TopicPartition("test-topic", 12))));
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.recover();
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String encodedPartitionFirst = this.getTimebasedEncodedPartition(timestampFirst);
        String encodedPartitionLater = this.getTimebasedEncodedPartition(timestampLater);
        String dirPrefixFirst = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionFirst);
        HashSet<Path> expectedFiles = new HashSet<Path>();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        for (int i : new int[]{0, 3, 6}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixFirst, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        String dirPrefixLater = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionLater);
        for (int i : new int[]{9, 12, 15}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixLater, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        this.verify(expectedFiles, 3, records, schema);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        this.setUp();
        this.partitioner = new DataWriter.PartitionerWrapper((io.confluent.connect.storage.partitioner.Partitioner)new HourlyPartitioner());
        this.parsedConfig.put("timestamp.extractor", "Record");
        this.partitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchema();
        List<Struct> records = this.createRecordBatches(schema, 3, 6);
        DateTime first = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID((String)"America/Los_Angeles"));
        long advanceMs = 20000L;
        long timestampFirst = first.getMillis();
        List<SinkRecord> sinkRecords = this.createSinkRecordsWithTimestamp(records.subList(0, 9), schema, 0, timestampFirst, advanceMs);
        long timestampLater = first.plusHours(2).getMillis();
        sinkRecords.addAll(this.createSinkRecordsWithTimestamp(records.subList(9, 18), schema, 9, timestampLater, advanceMs));
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.recover();
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String encodedPartitionFirst = this.getTimebasedEncodedPartition(timestampFirst);
        String encodedPartitionLater = this.getTimebasedEncodedPartition(timestampLater);
        String dirPrefixFirst = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionFirst);
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        HashSet<Path> expectedFiles = new HashSet<Path>();
        for (int i : new int[]{0, 3, 6}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixFirst, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        String dirPrefixLater = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionLater);
        for (int i : new int[]{9, 12}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixLater, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        this.verify(expectedFiles, 3, records, schema);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        this.setUp();
        this.partitioner = new DataWriter.PartitionerWrapper((io.confluent.connect.storage.partitioner.Partitioner)new DailyPartitioner());
        this.parsedConfig.put("timestamp.extractor", "Record");
        this.parsedConfig.put("path.format", "'year'=YYYY/'month'=MM/'day'=dd");
        this.partitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)this.time);
        Schema schema = this.createSchema();
        List<Struct> records = this.createRecordBatches(schema, 3, 6);
        DateTime first = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID((String)"America/Los_Angeles"));
        long advanceMs = 20000L;
        long timestampFirst = first.getMillis();
        List<SinkRecord> sinkRecords = this.createSinkRecordsWithTimestamp(records.subList(0, 9), schema, 0, timestampFirst, advanceMs);
        long timestampLater = first.plusHours(2).getMillis();
        sinkRecords.addAll(this.createSinkRecordsWithTimestamp(records.subList(9, 18), schema, 9, timestampLater, advanceMs));
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.recover();
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String encodedPartitionFirst = this.getTimebasedEncodedPartition(timestampFirst);
        String encodedPartitionLater = this.getTimebasedEncodedPartition(timestampLater);
        String dirPrefixFirst = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionFirst);
        HashSet<Path> expectedFiles = new HashSet<Path>();
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        for (int i : new int[]{0, 3, 6}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixFirst, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        String dirPrefixLater = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionLater);
        for (int i : new int[]{9, 12}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixLater, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        this.verify(expectedFiles, 3, records, schema);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.HOURS.toMillis(1L)));
        this.localProps.put("rotate.schedule.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(10L)));
        this.setUp();
        this.partitioner = new DataWriter.PartitionerWrapper((io.confluent.connect.storage.partitioner.Partitioner)new io.confluent.connect.storage.partitioner.TimeBasedPartitioner());
        this.parsedConfig.put("partition.duration.ms", TimeUnit.DAYS.toMillis(1L));
        this.parsedConfig.put("timestamp.extractor", MockedWallclockTimestampExtractor.class.getName());
        this.partitioner.configure(this.parsedConfig);
        MockedWallclockTimestampExtractor.TIME.sleep(Time.SYSTEM.milliseconds());
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (io.confluent.common.utils.Time)MockedWallclockTimestampExtractor.TIME);
        Schema schema = this.createSchema();
        List<Struct> records = this.createRecordBatches(schema, 3, 6);
        List<SinkRecord> sinkRecords = this.createSinkRecords(records.subList(0, 3), schema, 0L, Collections.singleton(new TopicPartition("test-topic", 12)));
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.recover();
        topicPartitionWriter.write();
        long timestampFirst = MockedWallclockTimestampExtractor.TIME.milliseconds();
        MockedWallclockTimestampExtractor.TIME.sleep(TimeUnit.MINUTES.toMillis(11L));
        topicPartitionWriter.write();
        sinkRecords = this.createSinkRecords(records.subList(3, 6), schema, 3L, Collections.singleton(new TopicPartition("test-topic", 12)));
        for (SinkRecord record : sinkRecords) {
            topicPartitionWriter.buffer(record);
        }
        topicPartitionWriter.write();
        long timestampLater = MockedWallclockTimestampExtractor.TIME.milliseconds();
        MockedWallclockTimestampExtractor.TIME.sleep(TimeUnit.MINUTES.toMillis(11L));
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String encodedPartitionFirst = this.getTimebasedEncodedPartition(timestampFirst);
        String encodedPartitionLater = this.getTimebasedEncodedPartition(timestampLater);
        String dirPrefixFirst = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionFirst);
        String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION.topic());
        HashSet<Path> expectedFiles = new HashSet<Path>();
        for (int i : new int[]{0}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixFirst, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        String dirPrefixLater = this.partitioner.generatePartitionedPath("test-topic", encodedPartitionLater);
        for (int i : new int[]{3}) {
            expectedFiles.add(new Path(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)dirPrefixLater, (TopicPartition)TOPIC_PARTITION, (long)i, (long)(i + 2), (String)this.extension, (String)this.zeroPadFormat)));
        }
        this.verify(expectedFiles, 3, records, schema);
    }

    private String getTimebasedEncodedPartition(long timestamp) {
        long partitionDurationMs = (Long)this.parsedConfig.get("partition.duration.ms");
        String pathFormat = (String)this.parsedConfig.get("path.format");
        String timeZone = (String)this.parsedConfig.get("timezone");
        return TimeUtils.encodeTimestamp(partitionDurationMs, pathFormat, timeZone, timestamp);
    }

    private void createTopicDir(String url, String topicsDir, String topic) throws IOException {
        Path path = new Path(FileUtils.topicDirectory((String)url, (String)topicsDir, (String)topic));
        if (!fs.exists(path)) {
            fs.mkdirs(path);
        }
    }

    private void createLogsDir(String url, String logsDir) throws IOException {
        Path path = new Path(url + "/" + logsDir);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
        }
    }

    private void verify(Set<Path> expectedFiles, int expectedSize, List<Struct> records, Schema schema) throws IOException {
        String topicsDir = (String)this.topicsDir.get("test-topic");
        Path path = new Path(FileUtils.topicDirectory((String)this.url, (String)topicsDir, (String)"test-topic"));
        FileStatus[] statuses = FileUtils.traverse((Storage)this.storage, (Path)path, (PathFilter)new CommittedFileFilter());
        Assert.assertEquals((long)expectedFiles.size(), (long)statuses.length);
        int index = 0;
        for (FileStatus status : statuses) {
            Path filePath = status.getPath();
            Assert.assertTrue((boolean)expectedFiles.contains(status.getPath()));
            Collection<Object> avroRecords = this.dataFileReader.readData(this.connectorConfig.getHadoopConfiguration(), filePath);
            Assert.assertEquals((long)expectedSize, (long)avroRecords.size());
            for (Object avroRecord : avroRecords) {
                Assert.assertEquals((Object)this.avroData.fromConnectData(schema, (Object)records.get(index++)), (Object)avroRecord);
            }
        }
    }

    public static class MockedWallclockTimestampExtractor
    extends TimeBasedPartitioner.WallclockTimestampExtractor {
        public static final MockTime TIME = new MockTime();

        public void configure(Map<String, Object> config) {
        }

        public Long extract(ConnectRecord<?> record) {
            return TIME.milliseconds();
        }
    }
}

