package io.confluent.connect.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import io.confluent.common.utils.MockTime;
import io.confluent.common.utils.SystemTime;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.FileUtils;
import io.confluent.connect.s3.util.TimeUtils;
import io.confluent.connect.storage.format.RecordWriterProvider;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.FieldPartitioner;
import io.confluent.connect.storage.partitioner.HourlyPartitioner;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.storage.partitioner.TimestampExtractor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/s3/TopicPartitionWriterTest.class */
public class TopicPartitionWriterTest extends TestWithMockedS3 {
    private static final String ZERO_PAD_FMT = "%010d";
    private RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
    private S3Storage storage;
    private AvroFormat format;
    private static String extension;
    private AmazonS3 s3;
    Map<String, String> localProps = new HashMap();

    /* loaded from: input_file:io/confluent/connect/s3/TopicPartitionWriterTest$MockedWallclockTimestampExtractor.class */
    public static class MockedWallclockTimestampExtractor implements TimestampExtractor {
        public final MockTime time = new MockTime();

        public void configure(Map<String, Object> map) {
        }

        public Long extract(ConnectRecord<?> connectRecord) {
            return Long.valueOf(this.time.milliseconds());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    public Map<String, String> createProps() {
        Map<String, String> createProps = super.createProps();
        createProps.putAll(this.localProps);
        return createProps;
    }

    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    public void setUp() throws Exception {
        super.setUp();
        this.s3 = newS3Client(this.connectorConfig);
        this.storage = new S3Storage(this.connectorConfig, this.url, "kafka.bucket", this.s3);
        this.format = new AvroFormat(this.storage);
        this.writerProvider = new AvroFormat(this.storage).getRecordWriterProvider();
        extension = this.writerProvider.getExtension();
    }

    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    @After
    public void tearDown() throws Exception {
        super.tearDown();
        this.localProps.clear();
    }

    @Test
    public void testWriteRecordDefaultWithPaddingSeq() throws Exception {
        this.localProps.put("filename.offset.zero.pad.width", "2");
        setUp();
        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
        defaultPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, defaultPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatch = createRecordBatch(createSchema, 10);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatch, "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String generatePartitionedPath = defaultPartitioner.generatePartitionedPath("test-topic", "partition=12");
        ArrayList arrayList = new ArrayList();
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 0L, extension, "%02d"));
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 3L, extension, "%02d"));
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 6L, extension, "%02d"));
        verify(arrayList, 3, createSchema, createRecordBatch);
    }

    @Test
    public void testWriteRecordDefaultWithPadding() throws Exception {
        this.localProps.put("filename.offset.zero.pad.width", "2");
        setUp();
        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
        defaultPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, defaultPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 3);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches, "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String generatePartitionedPath = defaultPartitioner.generatePartitionedPath("test-topic", "partition=12");
        ArrayList arrayList = new ArrayList();
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 0L, extension, "%02d"));
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 3L, extension, "%02d"));
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 6L, extension, "%02d"));
        verify(arrayList, 3, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordFieldPartitioner() throws Exception {
        this.localProps.put("flush.size", "9");
        setUp();
        FieldPartitioner fieldPartitioner = new FieldPartitioner();
        fieldPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, fieldPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches(createSchema, 3, 6), "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String str = (String) ((List) this.parsedConfig.get("partition.field.name")).get(0);
        String generatePartitionedPath = fieldPartitioner.generatePartitionedPath("test-topic", str + "=" + String.valueOf(16));
        String generatePartitionedPath2 = fieldPartitioner.generatePartitionedPath("test-topic", str + "=" + String.valueOf(17));
        String generatePartitionedPath3 = fieldPartitioner.generatePartitionedPath("test-topic", str + "=" + String.valueOf(18));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            for (int i2 = 0; i2 < 6; i2++) {
                arrayList.add(createRecord(createSchema, 16 + i, 12.2f + i));
            }
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < 18; i3 += 9) {
            arrayList2.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, i3, extension, ZERO_PAD_FMT));
            arrayList2.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, i3 + 1, extension, ZERO_PAD_FMT));
            arrayList2.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath3, TOPIC_PARTITION, i3 + 2, extension, ZERO_PAD_FMT));
        }
        verify(arrayList2, 3, createSchema, arrayList);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionWallclockRealtime() throws Exception {
        setUp();
        long millis = TimeUnit.SECONDS.toMillis(5L);
        long millis2 = TimeUnit.MINUTES.toMillis(1L);
        TimeBasedPartitioner timeBasedPartitioner = new TimeBasedPartitioner();
        this.parsedConfig.put("partition.duration.ms", Long.valueOf(millis2));
        this.parsedConfig.put("path.format", "'year'=YYYY_'month'=MM_'day'=dd_'hour'=HH_'min'=mm_'sec'=ss");
        timeBasedPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, timeBasedPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 6);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches.subList(0, 9), "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        long milliseconds = Time.SYSTEM.milliseconds();
        topicPartitionWriter.write();
        Time.SYSTEM.sleep(millis);
        Iterator<SinkRecord> it2 = createSinkRecords(createRecordBatches.subList(9, 18), "key", createSchema, 9).iterator();
        while (it2.hasNext()) {
            topicPartitionWriter.buffer(it2.next());
        }
        long milliseconds2 = Time.SYSTEM.milliseconds();
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String timebasedEncodedPartition = getTimebasedEncodedPartition(milliseconds);
        String timebasedEncodedPartition2 = getTimebasedEncodedPartition(milliseconds2);
        String generatePartitionedPath = timeBasedPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition);
        ArrayList arrayList = new ArrayList();
        int length = new int[]{0, 3, 6}.length;
        for (int i = 0; i < length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, r0[i], extension, ZERO_PAD_FMT));
        }
        String generatePartitionedPath2 = timeBasedPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition2);
        int length2 = new int[]{9, 12, 15}.length;
        for (int i2 = 0; i2 < length2; i2++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, r0[i2], extension, ZERO_PAD_FMT));
        }
        verify(arrayList, 3, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionWallclockMocked() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.HOURS.toMillis(1L)));
        setUp();
        TimeBasedPartitioner timeBasedPartitioner = new TimeBasedPartitioner();
        this.parsedConfig.put("partition.duration.ms", Long.valueOf(TimeUnit.DAYS.toMillis(1L)));
        this.parsedConfig.put("timestamp.extractor", MockedWallclockTimestampExtractor.class.getName());
        timeBasedPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, timeBasedPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 6);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches.subList(0, 9), "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        MockTime mockTime = ((MockedWallclockTimestampExtractor) timeBasedPartitioner.getTimestampExtractor()).time;
        mockTime.sleep(Time.SYSTEM.milliseconds());
        long milliseconds = mockTime.milliseconds();
        topicPartitionWriter.write();
        mockTime.sleep(7200000L);
        Iterator<SinkRecord> it2 = createSinkRecords(createRecordBatches.subList(9, 18), "key", createSchema, 9).iterator();
        while (it2.hasNext()) {
            topicPartitionWriter.buffer(it2.next());
        }
        long milliseconds2 = mockTime.milliseconds();
        topicPartitionWriter.write();
        mockTime.sleep(3600001L);
        Iterator<SinkRecord> it3 = createSinkRecords(createRecordBatches.subList(17, 18), "key", createSchema, 1).iterator();
        while (it3.hasNext()) {
            topicPartitionWriter.buffer(it3.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String timebasedEncodedPartition = getTimebasedEncodedPartition(milliseconds);
        String timebasedEncodedPartition2 = getTimebasedEncodedPartition(milliseconds2);
        String generatePartitionedPath = timeBasedPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition);
        ArrayList arrayList = new ArrayList();
        int length = new int[]{0}.length;
        for (int i = 0; i < length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, r0[i], extension, ZERO_PAD_FMT));
        }
        String generatePartitionedPath2 = timeBasedPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition2);
        int length2 = new int[]{9}.length;
        for (int i2 = 0; i2 < length2; i2++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, r0[i2], extension, ZERO_PAD_FMT));
        }
        verify(arrayList, 9, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        setUp();
        HourlyPartitioner hourlyPartitioner = new HourlyPartitioner();
        this.parsedConfig.put("rotate.interval.ms", Long.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        this.parsedConfig.put("timestamp.extractor", "Record");
        hourlyPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, hourlyPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 6);
        DateTime dateTime = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID("America/Los_Angeles"));
        long millis = dateTime.getMillis();
        List<SinkRecord> createSinkRecordsWithTimestamp = createSinkRecordsWithTimestamp(createRecordBatches.subList(0, 9), "key", createSchema, 0, millis, 20000L);
        long millis2 = dateTime.plusHours(2).getMillis();
        createSinkRecordsWithTimestamp.addAll(createSinkRecordsWithTimestamp(createRecordBatches.subList(9, 18), "key", createSchema, 9, millis2, 20000L));
        Iterator<SinkRecord> it = createSinkRecordsWithTimestamp.iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String timebasedEncodedPartition = getTimebasedEncodedPartition(millis);
        String timebasedEncodedPartition2 = getTimebasedEncodedPartition(millis2);
        String generatePartitionedPath = hourlyPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition);
        ArrayList arrayList = new ArrayList();
        int length = new int[]{0, 3, 6}.length;
        for (int i = 0; i < length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, r0[i], extension, ZERO_PAD_FMT));
        }
        String generatePartitionedPath2 = hourlyPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition2);
        int length2 = new int[]{9, 12}.length;
        for (int i2 = 0; i2 < length2; i2++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, r0[i2], extension, ZERO_PAD_FMT));
        }
        verify(arrayList, 3, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        setUp();
        DailyPartitioner dailyPartitioner = new DailyPartitioner();
        this.parsedConfig.put("timestamp.extractor", "Record");
        this.parsedConfig.put("path.format", "'year'=YYYY_'month'=MM_'day'=dd");
        dailyPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, dailyPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 6);
        DateTime dateTime = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID("America/Los_Angeles"));
        long millis = dateTime.getMillis();
        List<SinkRecord> createSinkRecordsWithTimestamp = createSinkRecordsWithTimestamp(createRecordBatches.subList(0, 9), "key", createSchema, 0, millis, 20000L);
        long millis2 = dateTime.plusHours(2).getMillis();
        createSinkRecordsWithTimestamp.addAll(createSinkRecordsWithTimestamp(createRecordBatches.subList(9, 18), "key", createSchema, 9, millis2, 20000L));
        Iterator<SinkRecord> it = createSinkRecordsWithTimestamp.iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String timebasedEncodedPartition = getTimebasedEncodedPartition(millis);
        String timebasedEncodedPartition2 = getTimebasedEncodedPartition(millis2);
        String generatePartitionedPath = dailyPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition);
        ArrayList arrayList = new ArrayList();
        int length = new int[]{0, 3, 6}.length;
        for (int i = 0; i < length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, r0[i], extension, ZERO_PAD_FMT));
        }
        String generatePartitionedPath2 = dailyPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition2);
        int length2 = new int[]{9, 12}.length;
        for (int i2 = 0; i2 < length2; i2++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, r0[i2], extension, ZERO_PAD_FMT));
        }
        verify(arrayList, 3, createSchema, createRecordBatches);
    }

    @Test(expected = ConnectException.class)
    public void testWriteRecordTimeBasedPartitionWithNullTimestamp() throws Exception {
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        setUp();
        TimeBasedPartitioner timeBasedPartitioner = new TimeBasedPartitioner();
        this.parsedConfig.put("timestamp.extractor", "Record");
        this.parsedConfig.put("path.format", "'year'=YYYY_'month'=MM_'day'=dd");
        this.parsedConfig.put("partition.duration.ms", Long.valueOf(TimeUnit.DAYS.toMillis(1L)));
        timeBasedPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, timeBasedPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches(createSchema, 1, 1), "key", createSchema, 0).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
    }

    @Test
    public void testWallclockUsesBatchTimePartitionBoundary() throws Exception {
        this.localProps.put("flush.size", "6");
        setUp();
        TimeBasedPartitioner timeBasedPartitioner = new TimeBasedPartitioner();
        this.parsedConfig.put("partition.duration.ms", Long.valueOf(TimeUnit.DAYS.toMillis(1L)));
        this.parsedConfig.put("timestamp.extractor", TimeBasedPartitioner.WallclockTimestampExtractor.class.getName());
        timeBasedPartitioner.configure(this.parsedConfig);
        io.confluent.common.utils.Time time = (io.confluent.common.utils.Time) EasyMock.createMock(SystemTime.class);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.writerProvider, timeBasedPartitioner, this.connectorConfig, this.context, time);
        EasyMock.expect(Long.valueOf(time.milliseconds())).andReturn(3599000L);
        EasyMock.replay(new Object[]{time});
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 6);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches.subList(0, 9), "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        ArrayList arrayList = new ArrayList();
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, timeBasedPartitioner.generatePartitionedPath("test-topic", getTimebasedEncodedPartition(3599000L)), TOPIC_PARTITION, 0L, extension, ZERO_PAD_FMT));
        verify(arrayList, 6, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.HOURS.toMillis(1L)));
        this.localProps.put("rotate.schedule.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(10L)));
        setUp();
        TimeBasedPartitioner timeBasedPartitioner = new TimeBasedPartitioner();
        this.parsedConfig.put("partition.duration.ms", Long.valueOf(TimeUnit.DAYS.toMillis(1L)));
        this.parsedConfig.put("timestamp.extractor", MockedWallclockTimestampExtractor.class.getName());
        timeBasedPartitioner.configure(this.parsedConfig);
        MockTime mockTime = ((MockedWallclockTimestampExtractor) timeBasedPartitioner.getTimestampExtractor()).time;
        mockTime.sleep(Time.SYSTEM.milliseconds());
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.writerProvider, timeBasedPartitioner, this.connectorConfig, this.context, mockTime);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 6);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches.subList(0, 3), "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        long milliseconds = mockTime.milliseconds();
        mockTime.sleep(TimeUnit.MINUTES.toMillis(11L));
        topicPartitionWriter.write();
        Iterator<SinkRecord> it2 = createSinkRecords(createRecordBatches.subList(3, 6), "key", createSchema, 3).iterator();
        while (it2.hasNext()) {
            topicPartitionWriter.buffer(it2.next());
        }
        topicPartitionWriter.write();
        long milliseconds2 = mockTime.milliseconds();
        mockTime.sleep(TimeUnit.MINUTES.toMillis(11L));
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String timebasedEncodedPartition = getTimebasedEncodedPartition(milliseconds);
        String timebasedEncodedPartition2 = getTimebasedEncodedPartition(milliseconds2);
        String generatePartitionedPath = timeBasedPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition);
        ArrayList arrayList = new ArrayList();
        int length = new int[]{0}.length;
        for (int i = 0; i < length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, r0[i], extension, ZERO_PAD_FMT));
        }
        String generatePartitionedPath2 = timeBasedPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition2);
        int length2 = new int[]{3}.length;
        for (int i2 = 0; i2 < length2; i2++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, r0[i2], extension, ZERO_PAD_FMT));
        }
        verify(arrayList, 3, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Exception {
        this.localProps.put("flush.size", "1000");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        setUp();
        HourlyPartitioner hourlyPartitioner = new HourlyPartitioner();
        this.parsedConfig.put("timestamp.extractor", "RecordField");
        hourlyPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, hourlyPartitioner, this.connectorConfig, this.context);
        Schema createSchemaWithTimestampField = createSchemaWithTimestampField();
        DateTime dateTime = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID("America/Los_Angeles"));
        long millis = dateTime.getMillis();
        ArrayList arrayList = new ArrayList(18);
        for (int i = 0; i < 18 / 2; i++) {
            arrayList.add(createRecordWithTimestampField(createSchemaWithTimestampField, millis));
            millis += 20000;
        }
        List<SinkRecord> createSinkRecords = createSinkRecords(arrayList.subList(0, 9), "key", createSchemaWithTimestampField);
        long millis2 = dateTime.plusHours(2).getMillis();
        for (int i2 = 18 / 2; i2 < 18; i2++) {
            arrayList.add(createRecordWithTimestampField(createSchemaWithTimestampField, millis2));
            millis2 += 20000;
        }
        createSinkRecords.addAll(createSinkRecords(arrayList.subList(9, 18), "key", createSchemaWithTimestampField, 9));
        createSinkRecords.addAll(createSinkRecords(Collections.singletonList(createRecordWithTimestampField(createSchemaWithTimestampField, dateTime.plusHours(6).getMillis())), "key", createSchemaWithTimestampField, 19));
        Iterator<SinkRecord> it = createSinkRecords.iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String timebasedEncodedPartition = getTimebasedEncodedPartition(millis);
        String timebasedEncodedPartition2 = getTimebasedEncodedPartition(millis2);
        String generatePartitionedPath = hourlyPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition);
        List<String> arrayList2 = new ArrayList<>();
        int length = new int[]{0, 3, 6}.length;
        for (int i3 = 0; i3 < length; i3++) {
            arrayList2.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, r0[i3], extension, ZERO_PAD_FMT));
        }
        String generatePartitionedPath2 = hourlyPartitioner.generatePartitionedPath("test-topic", timebasedEncodedPartition2);
        int length2 = new int[]{9, 12, 15}.length;
        for (int i4 = 0; i4 < length2; i4++) {
            arrayList2.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath2, TOPIC_PARTITION, r0[i4], extension, ZERO_PAD_FMT));
        }
        verify(arrayList2, 3, createSchemaWithTimestampField, arrayList);
    }

    private String getTimebasedEncodedPartition(long j) {
        return TimeUtils.encodeTimestamp(((Long) this.parsedConfig.get("partition.duration.ms")).longValue(), (String) this.parsedConfig.get("path.format"), (String) this.parsedConfig.get("timezone"), j);
    }

    @Test
    public void testNoFilesWrittenWithoutCommit() throws Exception {
        this.localProps.put("flush.size", "10");
        setUp();
        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
        defaultPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, defaultPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 3);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches, "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        verify(Collections.emptyList(), -1, createSchema, createRecordBatches);
    }

    @Test
    public void testRotateIntervalIsIgnoredWhenUsedWithNoTimeBasedPartitioner() throws Exception {
        this.localProps.put("flush.size", "10");
        this.localProps.put("rotate.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(1L)));
        setUp();
        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
        defaultPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, defaultPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 3);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches, "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        verify(Collections.emptyList(), -1, createSchema, createRecordBatches);
    }

    @Test
    public void testWriteRecordDefaultWithEmptyTopicsDir() throws Exception {
        this.localProps.put("topics.dir", "");
        setUp();
        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
        defaultPartitioner.configure(this.parsedConfig);
        TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(TOPIC_PARTITION, this.storage, this.writerProvider, defaultPartitioner, this.connectorConfig, this.context);
        Schema createSchema = createSchema();
        List<Struct> createRecordBatches = createRecordBatches(createSchema, 3, 3);
        Iterator<SinkRecord> it = createSinkRecords(createRecordBatches, "key", createSchema).iterator();
        while (it.hasNext()) {
            topicPartitionWriter.buffer(it.next());
        }
        topicPartitionWriter.write();
        topicPartitionWriter.close();
        String generatePartitionedPath = defaultPartitioner.generatePartitionedPath("test-topic", "partition=12");
        ArrayList arrayList = new ArrayList();
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 0L, extension, ZERO_PAD_FMT));
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 3L, extension, ZERO_PAD_FMT));
        arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, generatePartitionedPath, TOPIC_PARTITION, 6L, extension, ZERO_PAD_FMT));
        verify(arrayList, 3, createSchema, createRecordBatches);
    }

    private Struct createRecord(Schema schema, int i, float f) {
        return new Struct(schema).put("boolean", true).put("int", Integer.valueOf(i)).put("long", Long.valueOf(i)).put("float", Float.valueOf(f)).put("double", Double.valueOf(f));
    }

    private List<Struct> createRecordBatch(Schema schema, int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(createRecord(schema, 16 + i2, 12.2f + i2));
        }
        return arrayList;
    }

    private List<Struct> createRecordBatches(Schema schema, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.addAll(createRecordBatch(schema, i));
        }
        return arrayList;
    }

    private List<SinkRecord> createSinkRecords(List<Struct> list, String str, Schema schema) {
        return createSinkRecords(list, str, schema, 0);
    }

    private List<SinkRecord> createSinkRecords(List<Struct> list, String str, Schema schema, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < list.size(); i2++) {
            arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, str, schema, list.get(i2), i2 + i));
        }
        return arrayList;
    }

    private List<SinkRecord> createSinkRecordsWithTimestamp(List<Struct> list, String str, Schema schema, int i, long j, long j2) {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        int i3 = i;
        while (i2 < list.size()) {
            arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, str, schema, list.get(i2), i3, Long.valueOf(j + (i3 * j2)), TimestampType.CREATE_TIME));
            i2++;
            i3++;
        }
        return arrayList;
    }

    private void verify(List<String> list, int i, Schema schema, List<Struct> list2) throws IOException {
        List<S3ObjectSummary> listObjects = listObjects("kafka.bucket", null, this.s3);
        ArrayList arrayList = new ArrayList();
        Iterator<S3ObjectSummary> it = listObjects.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKey());
        }
        Collections.sort(arrayList);
        Collections.sort(list);
        Assert.assertThat(arrayList, CoreMatchers.is(list));
        int i2 = 0;
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Collection<Object> readRecordsAvro = readRecordsAvro("kafka.bucket", (String) it2.next(), this.s3);
            Assert.assertEquals(i, readRecordsAvro.size());
            Iterator<Object> it3 = readRecordsAvro.iterator();
            while (it3.hasNext()) {
                int i3 = i2;
                i2++;
                Assert.assertEquals(this.format.getAvroData().fromConnectData(schema, list2.get(i3)), it3.next());
            }
        }
    }
}
