/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs.string;

import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.string.StringDataFileReader;
import io.confluent.connect.hdfs.string.StringFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DataWriterStringTest
extends TestWithMiniDFSCluster {
    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.dataFileReader = new StringDataFileReader();
        this.extension = ".txt";
    }

    @Override
    protected Map<String, String> createProps() {
        Map<String, String> props = super.createProps();
        props.put("format.class", StringFormat.class.getName());
        return props;
    }

    @Test
    public void testReadString() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createStringRecords(7 * this.context.assignment().size(), this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets, this.context.assignment());
    }

    protected List<SinkRecord> createStringRecords(int size, Set<TopicPartition> partitions) {
        String key = "key";
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        long offset = 0L;
        long total = 0L;
        while (total < (long)size) {
            for (TopicPartition tp : partitions) {
                String record = "Some random text. Offset: " + offset;
                sinkRecords.add(new SinkRecord("test-topic", tp.partition(), null, (Object)key, null, (Object)record, offset));
                if (++total < (long)size) continue;
                break;
            }
            ++offset;
        }
        return sinkRecords;
    }

    @Override
    protected void verifyContents(List<SinkRecord> expectedRecords, int startIndex, Collection<Object> records) {
        for (Object record : records) {
            SinkRecord expectedRecord = expectedRecords.get(2 * startIndex++);
            Object expectedValue = expectedRecord.value();
            Assert.assertEquals((Object)expectedValue, (Object)record);
        }
    }
}

