/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.common.utils.MockTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.HdfsSinkConnectorTestBase;
import io.confluent.connect.hdfs.utils.Data;
import io.confluent.connect.hdfs.utils.MemoryFormat;
import io.confluent.connect.hdfs.utils.MemoryRecordWriter;
import io.confluent.connect.hdfs.utils.MemoryStorage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FailureRecoveryTest
extends HdfsSinkConnectorTestBase {
    private static final String ZERO_PAD_FMT = "%010d";
    private static final String extension = "";
    private Map<String, String> localProps = new HashMap<String, String>();
    private MockTime time;

    @Override
    @Before
    public void setUp() throws Exception {
        this.time = new MockTime();
        this.time.sleep(System.currentTimeMillis());
        super.setUp();
    }

    @Override
    protected Map<String, String> createProps() {
        Map<String, String> props = super.createProps();
        props.put("storage.class", MemoryStorage.class.getName());
        props.put("format.class", MemoryFormat.class.getName());
        props.putAll(this.localProps);
        return props;
    }

    @Test
    public void testCommitFailure() {
        ArrayList<SinkRecord> sinkRecords = this.createRecords(12, 0, 7);
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (Time)this.time);
        MemoryStorage storage = (MemoryStorage)hdfsWriter.getStorage();
        storage.setFailure(MemoryStorage.Failure.appendFailure);
        hdfsWriter.write(sinkRecords);
        Assert.assertEquals((long)this.context.timeout(), (long)this.connectorConfig.getLong("retry.backoff.ms"));
        Map<String, List<Object>> data = Data.getData();
        String logFile = FileUtils.logFileName((String)this.url, (String)this.logsDir, (TopicPartition)TOPIC_PARTITION);
        List<Object> content = data.get(logFile);
        Assert.assertEquals(null, content);
        hdfsWriter.write(new ArrayList());
        content = data.get(logFile);
        Assert.assertEquals(null, content);
        this.time.sleep(this.context.timeout());
        hdfsWriter.write(new ArrayList());
        content = data.get(logFile);
        Assert.assertEquals((long)6L, (long)content.size());
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    @Test
    public void testRotateAppendFailure() throws Exception {
        this.localProps.put("rotate.schedule.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(10L)));
        this.setUp();
        String key = "key";
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        ArrayList<SinkRecord> sinkRecordsA = new ArrayList<SinkRecord>();
        ArrayList sinkRecordsB = new ArrayList();
        for (long offset = 0L; offset < 7L; ++offset) {
            SinkRecord sinkRecord = new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset);
            (offset < 4L ? sinkRecordsA : sinkRecordsB).add(sinkRecord);
        }
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (Time)this.time);
        MemoryStorage storage = (MemoryStorage)hdfsWriter.getStorage();
        hdfsWriter.recover(TOPIC_PARTITION);
        hdfsWriter.write(sinkRecordsA);
        this.time.sleep(2L * (Long)this.connectorConfig.get("rotate.schedule.interval.ms"));
        storage.setFailure(MemoryStorage.Failure.appendFailure);
        Data.logContents("Before failure");
        hdfsWriter.write(new ArrayList());
        Data.logContents("After failure");
        Assert.assertEquals((long)this.context.timeout(), (long)this.connectorConfig.getLong("retry.backoff.ms"));
        this.time.sleep(this.context.timeout());
        storage.setFailure(null);
        hdfsWriter.write(sinkRecordsB);
        Data.logContents("After test");
        long[] validOffsets = new long[]{-1L, 2L, 3L, 6L};
        for (int i = 1; i < validOffsets.length; ++i) {
            long startOffset = validOffsets[i - 1] + 1L;
            long endOffset = validOffsets[i];
            String path = FileUtils.committedFileName((String)this.url, (String)this.topicsDir.getOrDefault("test-topic", "topics"), (String)"test-topic/partition=12", (TopicPartition)TOPIC_PARTITION, (long)startOffset, (long)endOffset, (String)extension, (String)ZERO_PAD_FMT);
            long size = endOffset - startOffset + 1L;
            List<Object> records = Data.getData().get(path);
            TestCase.assertNotNull((String)(path + " should have been created"), records);
            Assert.assertEquals((String)(path + " should contain a full batch of records"), (long)size, (long)records.size());
        }
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    @Test
    public void testWriterFailureMultiPartitions() {
        int i;
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        sinkRecords.add(this.createRecord(12, 0));
        sinkRecords.add(this.createRecord(13, 0));
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData, (Time)this.time);
        hdfsWriter.write(sinkRecords);
        sinkRecords.clear();
        sinkRecords.addAll(this.createRecords(12, 1, 6));
        sinkRecords.addAll(this.createRecords(13, 1, 6));
        String encodedPartition = "partition=12";
        Map writers = hdfsWriter.getWriters(TOPIC_PARTITION);
        MemoryRecordWriter writer = (MemoryRecordWriter)writers.get(encodedPartition);
        writer.setFailure(MemoryRecordWriter.Failure.writeFailure);
        hdfsWriter.write(sinkRecords);
        Assert.assertEquals((long)this.context.timeout(), (long)this.connectorConfig.getLong("retry.backoff.ms"));
        Map<String, List<Object>> data = Data.getData();
        String directory2 = "test-topic/partition=13";
        long[] validOffsets = new long[]{-1L, 2L, 5L};
        for (int i2 = 1; i2 < validOffsets.length; ++i2) {
            long startOffset = validOffsets[i2 - 1] + 1L;
            long endOffset = validOffsets[i2];
            String topicsDir = (String)this.topicsDir.get(TOPIC_PARTITION2.topic());
            String path = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)directory2, (TopicPartition)TOPIC_PARTITION2, (long)startOffset, (long)endOffset, (String)extension, (String)ZERO_PAD_FMT);
            long size = endOffset - startOffset + 1L;
            List<Object> records = data.get(path);
            Assert.assertEquals((long)size, (long)records.size());
        }
        hdfsWriter.write(new ArrayList());
        Assert.assertEquals((long)this.context.timeout(), (long)this.connectorConfig.getLong("retry.backoff.ms"));
        Map tempFileNames = hdfsWriter.getTempFileNames(TOPIC_PARTITION);
        String tempFileName = (String)tempFileNames.get(encodedPartition);
        List<Object> content = data.get(tempFileName);
        Assert.assertEquals((long)1L, (long)content.size());
        for (i = 0; i < content.size(); ++i) {
            Assert.assertEquals((Object)this.createRecord(12, i), (Object)content.get(i));
        }
        this.time.sleep(this.context.timeout());
        hdfsWriter.write(sinkRecords.subList(6, 12));
        Assert.assertEquals((long)3L, (long)content.size());
        for (i = 0; i < content.size(); ++i) {
            Assert.assertEquals((Object)this.createRecord(12, i), (Object)content.get(i));
        }
        hdfsWriter.write(new ArrayList());
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    @Test
    public void testWriterFailure() {
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(this.properties);
        ArrayList<SinkRecord> sinkRecords = this.createRecords(12, 0, 1);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData, (Time)this.time);
        hdfsWriter.write(sinkRecords);
        sinkRecords = this.createRecords(12, 1, 6);
        String encodedPartition = "partition=12";
        Map writers = hdfsWriter.getWriters(TOPIC_PARTITION);
        MemoryRecordWriter writer = (MemoryRecordWriter)writers.get(encodedPartition);
        writer.setFailure(MemoryRecordWriter.Failure.writeFailure);
        hdfsWriter.write(sinkRecords);
        Assert.assertEquals((long)this.context.timeout(), (long)connectorConfig.getLong("retry.backoff.ms"));
        hdfsWriter.write(new ArrayList());
        Map<String, List<Object>> data = Data.getData();
        Map tempFileNames = hdfsWriter.getTempFileNames(TOPIC_PARTITION);
        String tempFileName = (String)tempFileNames.get(encodedPartition);
        List<Object> content = data.get(tempFileName);
        Assert.assertEquals((long)1L, (long)content.size());
        Assert.assertEquals((Object)this.createRecord(12, 0), (Object)content.get(0));
        this.time.sleep(this.context.timeout());
        hdfsWriter.write(new ArrayList());
        tempFileNames = hdfsWriter.getTempFileNames(TOPIC_PARTITION);
        tempFileName = (String)tempFileNames.get(encodedPartition);
        content = data.get(tempFileName);
        Assert.assertEquals((long)1L, (long)content.size());
        SinkRecord refSinkRecord = this.createRecord(12, 6);
        Assert.assertEquals((Object)refSinkRecord, (Object)content.get(0));
        hdfsWriter.write(new ArrayList());
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    @Test
    public void testCloseFailure() throws Exception {
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(this.properties);
        ArrayList<SinkRecord> sinkRecords = this.createRecords(12, 0, 1);
        DataWriter hdfsWriter = new DataWriter(connectorConfig, (SinkTaskContext)this.context, this.avroData, (Time)this.time);
        hdfsWriter.write(sinkRecords);
        sinkRecords = this.createRecords(12, 1, 6);
        String encodedPartition = "partition=12";
        Map writers = hdfsWriter.getWriters(TOPIC_PARTITION);
        MemoryRecordWriter writer = (MemoryRecordWriter)writers.get(encodedPartition);
        writer.setFailure(MemoryRecordWriter.Failure.closeFailure);
        hdfsWriter.write(sinkRecords);
        Assert.assertEquals((long)this.context.timeout(), (long)connectorConfig.getLong("retry.backoff.ms"));
        sinkRecords = this.createRecords(12, 0, 7);
        hdfsWriter.write(sinkRecords);
        this.time.sleep(this.context.timeout());
        hdfsWriter.write(new ArrayList());
        Map<String, List<Object>> data = Data.getData();
        Map tempFileNames = hdfsWriter.getTempFileNames(TOPIC_PARTITION);
        String tempFileName = (String)tempFileNames.get(encodedPartition);
        List<Object> content = data.get(tempFileName);
        Assert.assertEquals((long)1L, (long)content.size());
        SinkRecord refSinkRecord = this.createRecord(12, 6);
        Assert.assertEquals((Object)refSinkRecord, (Object)content.get(0));
        writer = (MemoryRecordWriter)writers.get(encodedPartition);
        writer.setFailure(MemoryRecordWriter.Failure.closeFailure);
        hdfsWriter.write(this.createRecords(12, 7, 2));
        this.time.sleep(this.context.timeout());
        hdfsWriter.write(new ArrayList());
        Assert.assertEquals((long)6L, (long)((Long)hdfsWriter.getCommittedOffsets().get(new TopicPartition("test-topic", 12))));
        hdfsWriter.close();
        hdfsWriter.stop();
    }

    private SinkRecord createRecord(int partition, int offset) {
        String key = "key";
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        return new SinkRecord("test-topic", partition, Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, (long)offset);
    }

    private ArrayList<SinkRecord> createRecords(int partition, int startOffset, int numRecords) {
        ArrayList<SinkRecord> records = new ArrayList<SinkRecord>();
        for (int i = startOffset; i < numRecords + startOffset; ++i) {
            records.add(this.createRecord(partition, i));
        }
        return records;
    }
}

