package io.confluent.connect.hdfs.avro;

import io.confluent.common.utils.MockTime;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.avro.TopicPartitionWriterTest;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.WAL;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/hdfs/avro/DataWriterAvroTest.class */
public class DataWriterAvroTest extends TestWithMiniDFSCluster {
    @Override // io.confluent.connect.hdfs.TestWithMiniDFSCluster, io.confluent.connect.hdfs.HdfsSinkConnectorTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.dataFileReader = new AvroDataFileReader();
        this.extension = ".avro";
    }

    @Test
    public void testWriteRecord() throws Exception {
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> createSinkRecords = createSinkRecords(7);
        dataWriter.write(createSinkRecords);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecords, new long[]{0, 3, 6});
    }

    @Test
    public void testRecovery() throws Exception {
        this.fs.delete(new Path(FileUtils.directoryName(this.url, this.topicsDir, TOPIC_PARTITION)), true);
        HdfsStorage hdfsStorage = new HdfsStorage(this.connectorConfig, this.url);
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        WAL wal = hdfsStorage.wal(this.logsDir, TOPIC_PARTITION);
        wal.append("BEGIN", "");
        for (int i = 0; i < 5; i++) {
            String tempFileName = FileUtils.tempFileName(this.url, this.topicsDir, getDirectory(), this.extension);
            this.fs.createNewFile(new Path(tempFileName));
            wal.append(tempFileName, FileUtils.committedFileName(this.url, this.topicsDir, getDirectory(), TOPIC_PARTITION, i * 10, ((i + 1) * 10) - 1, this.extension, this.zeroPadFormat));
        }
        wal.append("END", "");
        wal.close();
        dataWriter.recover(TOPIC_PARTITION);
        Map offsets = this.context.offsets();
        Assert.assertTrue(offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals(50L, ((Long) offsets.get(TOPIC_PARTITION)).longValue());
        dataWriter.write(createSinkRecords(3, 50L));
        dataWriter.close();
        dataWriter.stop();
        verifyFileListing(new long[]{0, 10, 20, 30, 40, 50, 53}, Collections.singleton(new TopicPartition("test-topic", 12)));
    }

    @Test
    public void testWriteRecordMultiplePartitions() throws Exception {
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        Iterator it = this.context.assignment().iterator();
        while (it.hasNext()) {
            dataWriter.recover((TopicPartition) it.next());
        }
        List<SinkRecord> createSinkRecords = createSinkRecords(7, 0L, this.context.assignment());
        dataWriter.write(createSinkRecords);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecords, new long[]{0, 3, 6}, this.context.assignment());
    }

    @Test
    public void testWriteInterleavedRecordsInMultiplePartitions() throws Exception {
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        Iterator it = this.context.assignment().iterator();
        while (it.hasNext()) {
            dataWriter.recover((TopicPartition) it.next());
        }
        List<SinkRecord> createSinkRecordsInterleaved = createSinkRecordsInterleaved(7 * this.context.assignment().size(), 0L, this.context.assignment());
        dataWriter.write(createSinkRecordsInterleaved);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecordsInterleaved, new long[]{0, 3, 6}, this.context.assignment());
    }

    @Test
    public void testWriteInterleavedRecordsInMultiplePartitionsNonZeroInitialOffset() throws Exception {
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        List<SinkRecord> createSinkRecordsInterleaved = createSinkRecordsInterleaved(7 * this.context.assignment().size(), 9L, this.context.assignment());
        dataWriter.write(createSinkRecordsInterleaved);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecordsInterleaved, new long[]{9, 12, 15}, this.context.assignment());
    }

    @Test
    public void testGetPreviousOffsets() throws Exception {
        String str = "test-topic/partition=" + String.valueOf(12);
        long[] jArr = {0, 3};
        long[] jArr2 = {2, 5};
        for (int i = 0; i < jArr.length; i++) {
            this.fs.createNewFile(new Path(FileUtils.committedFileName(this.url, this.topicsDir, str, TOPIC_PARTITION, jArr[i], jArr2[i], this.extension, this.zeroPadFormat)));
        }
        this.fs.createNewFile(new Path(FileUtils.tempFileName(this.url, this.topicsDir, str, this.extension)));
        this.fs.createNewFile(new Path(FileUtils.fileName(this.url, this.topicsDir, str, "abcd")));
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        dataWriter.recover(TOPIC_PARTITION);
        Map committedOffsets = dataWriter.getCommittedOffsets();
        Assert.assertTrue(committedOffsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals(((Long) committedOffsets.get(TOPIC_PARTITION)).longValue(), 6L);
        dataWriter.close();
        dataWriter.stop();
    }

    @Test
    public void testWriteRecordNonZeroInitialOffset() throws Exception {
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> createSinkRecords = createSinkRecords(7, 3L);
        dataWriter.write(createSinkRecords);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecords, new long[]{3, 6, 9});
    }

    @Test
    public void testRebalance() throws Exception {
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        HashSet hashSet = new HashSet(this.context.assignment());
        Iterator<TopicPartition> it = hashSet.iterator();
        while (it.hasNext()) {
            dataWriter.recover(it.next());
        }
        HashSet hashSet2 = new HashSet();
        hashSet2.add(TOPIC_PARTITION);
        hashSet2.add(TOPIC_PARTITION3);
        List<SinkRecord> createSinkRecords = createSinkRecords(7, 0L, hashSet);
        dataWriter.write(createSinkRecords);
        dataWriter.close();
        this.context.setAssignment(hashSet2);
        dataWriter.open(hashSet2);
        Assert.assertEquals((Object) null, dataWriter.getBucketWriter(TOPIC_PARTITION2));
        Assert.assertNotNull(dataWriter.getBucketWriter(TOPIC_PARTITION));
        Assert.assertNotNull(dataWriter.getBucketWriter(TOPIC_PARTITION3));
        verify(createSinkRecords, new long[]{0, 3, 6}, Collections.singleton(TOPIC_PARTITION2), true);
        List<SinkRecord> createSinkRecords2 = createSinkRecords(3, 6L, this.context.assignment());
        dataWriter.write(createSinkRecords2);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecords2, new long[]{6, 9}, Collections.singleton(TOPIC_PARTITION), true);
        verify(createSinkRecords2, new long[]{6, 9}, Collections.singleton(TOPIC_PARTITION3), true);
    }

    @Test
    public void testProjectBackWard() throws Exception {
        Map<String, String> createProps = createProps();
        createProps.put("flush.size", "2");
        createProps.put("schema.compatibility", "BACKWARD");
        DataWriter dataWriter = new DataWriter(new HdfsSinkConnectorConfig(createProps), this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> createSinkRecordsWithAlternatingSchemas = createSinkRecordsWithAlternatingSchemas(7, 0L);
        dataWriter.write(createSinkRecordsWithAlternatingSchemas);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecordsWithAlternatingSchemas, new long[]{0, 1, 3, 5, 7});
    }

    @Test
    public void testProjectNone() throws Exception {
        Map<String, String> createProps = createProps();
        createProps.put("flush.size", "2");
        DataWriter dataWriter = new DataWriter(new HdfsSinkConnectorConfig(createProps), this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> createSinkRecordsWithAlternatingSchemas = createSinkRecordsWithAlternatingSchemas(7, 0L);
        dataWriter.write(createSinkRecordsWithAlternatingSchemas);
        dataWriter.close();
        dataWriter.stop();
        verify(createSinkRecordsWithAlternatingSchemas, new long[]{0, 1, 2, 3, 4, 5, 6});
    }

    @Test
    public void testProjectForward() throws Exception {
        Map<String, String> createProps = createProps();
        createProps.put("flush.size", "2");
        createProps.put("schema.compatibility", "FORWARD");
        DataWriter dataWriter = new DataWriter(new HdfsSinkConnectorConfig(createProps), this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> subList = createSinkRecordsWithAlternatingSchemas(8, 0L).subList(1, 8);
        dataWriter.write(subList);
        dataWriter.close();
        dataWriter.stop();
        verify(subList, new long[]{1, 2, 4, 6, 8});
    }

    @Test
    public void testProjectNoVersion() throws Exception {
        Map<String, String> createProps = createProps();
        createProps.put("schema.compatibility", "BACKWARD");
        DataWriter dataWriter = new DataWriter(new HdfsSinkConnectorConfig(createProps), this.context, this.avroData);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> createSinkRecordsNoVersion = createSinkRecordsNoVersion(1, 0L);
        createSinkRecordsNoVersion.addAll(createSinkRecordsWithAlternatingSchemas(7, 0L));
        try {
            dataWriter.write(createSinkRecordsNoVersion);
            Assert.fail("Version is required for Backward compatibility.");
            dataWriter.close();
            dataWriter.stop();
            verify(Collections.emptyList(), new long[0]);
        } catch (RuntimeException e) {
            dataWriter.close();
            dataWriter.stop();
            verify(Collections.emptyList(), new long[0]);
        } catch (Throwable th) {
            dataWriter.close();
            dataWriter.stop();
            verify(Collections.emptyList(), new long[0]);
            throw th;
        }
    }

    @Test
    public void testFlushPartialFile() throws Exception {
        long longValue = Long.valueOf("1000").longValue() * 2;
        int intValue = Integer.valueOf("10").intValue() + (Integer.valueOf("10").intValue() / 2);
        Map<String, String> createProps = createProps();
        createProps.put("flush.size", "10");
        createProps.put("rotate.interval.ms", "1000");
        createProps.put("partition.duration.ms", String.valueOf(TimeUnit.DAYS.toMillis(1L)));
        createProps.put("timestamp.extractor", TopicPartitionWriterTest.MockedWallclockTimestampExtractor.class.getName());
        createProps.put("partitioner.class", TimeBasedPartitioner.class.getName());
        HdfsSinkConnectorConfig hdfsSinkConnectorConfig = new HdfsSinkConnectorConfig(createProps);
        this.context.assignment().add(TOPIC_PARTITION);
        MockTime mockTime = TopicPartitionWriterTest.MockedWallclockTimestampExtractor.TIME;
        DataWriter dataWriter = new DataWriter(hdfsSinkConnectorConfig, this.context, this.avroData, mockTime);
        this.partitioner = dataWriter.getPartitioner();
        dataWriter.recover(TOPIC_PARTITION);
        dataWriter.write(createSinkRecords(intValue));
        mockTime.sleep(longValue);
        dataWriter.write(new ArrayList());
        Map committedOffsets = dataWriter.getCommittedOffsets();
        Assert.assertTrue(committedOffsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals(intValue, ((Long) committedOffsets.get(TOPIC_PARTITION)).longValue());
        dataWriter.close();
        dataWriter.stop();
    }
}
