package io.confluent.connect.hdfs;

import io.confluent.connect.hdfs.utils.Data;
import io.confluent.connect.hdfs.utils.MemoryFormat;
import io.confluent.connect.hdfs.utils.MemoryRecordWriter;
import io.confluent.connect.hdfs.utils.MemoryStorage;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/hdfs/FailureRecoveryTest.class */
public class FailureRecoveryTest extends HdfsSinkConnectorTestBase {
    private static final String ZERO_PAD_FMT = "%010d";
    private static final String extension = "";

    @Override // io.confluent.connect.hdfs.HdfsSinkConnectorTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.connect.hdfs.HdfsSinkConnectorTestBase
    public Map<String, String> createProps() {
        Map<String, String> createProps = super.createProps();
        createProps.put("storage.class", MemoryStorage.class.getName());
        createProps.put("format.class", MemoryFormat.class.getName());
        return createProps;
    }

    @Test
    public void testCommitFailure() throws Exception {
        Schema createSchema = createSchema();
        Struct createRecord = createRecord(createSchema);
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 7) {
                DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
                dataWriter.getStorage().setFailure(MemoryStorage.Failure.appendFailure);
                dataWriter.write(arrayList);
                Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
                Map<String, List<Object>> data = Data.getData();
                String logFileName = FileUtils.logFileName(this.url, this.logsDir, TOPIC_PARTITION);
                Assert.assertEquals((Object) null, data.get(logFileName));
                dataWriter.write(new ArrayList());
                Assert.assertEquals((Object) null, data.get(logFileName));
                Thread.sleep(this.context.timeout());
                dataWriter.write(new ArrayList());
                Assert.assertEquals(6L, data.get(logFileName).size());
                dataWriter.close();
                dataWriter.stop();
                return;
            }
            arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, j2));
            j = j2 + 1;
        }
    }

    @Test
    public void testWriterFailureMultiPartitions() throws Exception {
        Schema createSchema = createSchema();
        Struct createRecord = createRecord(createSchema);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, 0L));
        arrayList.add(new SinkRecord("test-topic", 13, Schema.STRING_SCHEMA, "key", createSchema, createRecord, 0L));
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData);
        dataWriter.write(arrayList);
        arrayList.clear();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 7) {
                break;
            }
            arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, j2));
            j = j2 + 1;
        }
        long j3 = 1;
        while (true) {
            long j4 = j3;
            if (j4 >= 7) {
                break;
            }
            arrayList.add(new SinkRecord("test-topic", 13, Schema.STRING_SCHEMA, "key", createSchema, createRecord, j4));
            j3 = j4 + 1;
        }
        String str = "partition=" + String.valueOf(12);
        MemoryRecordWriter memoryRecordWriter = (MemoryRecordWriter) dataWriter.getWriters(TOPIC_PARTITION).get(str);
        memoryRecordWriter.setFailure(MemoryRecordWriter.Failure.writeFailure);
        dataWriter.write(arrayList);
        Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
        Map<String, List<Object>> data = Data.getData();
        String str2 = "test-topic/partition=" + String.valueOf(13);
        long[] jArr = {-1, 2, 5};
        for (int i = 1; i < jArr.length; i++) {
            long j5 = jArr[i - 1] + 1;
            Assert.assertEquals((jArr[i] - j5) + 1, data.get(FileUtils.committedFileName(this.url, this.topicsDir, str2, TOPIC_PARTITION2, j5, r0, extension, ZERO_PAD_FMT)).size());
        }
        memoryRecordWriter.setFailure(MemoryRecordWriter.Failure.closeFailure);
        dataWriter.write(new ArrayList());
        Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
        List<Object> list = data.get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get(str));
        Assert.assertEquals(1L, list.size());
        for (int i2 = 0; i2 < list.size(); i2++) {
            Assert.assertEquals(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, i2), list.get(i2));
        }
        Thread.sleep(this.context.timeout());
        dataWriter.write(new ArrayList());
        Assert.assertEquals(3L, list.size());
        for (int i3 = 0; i3 < list.size(); i3++) {
            Assert.assertEquals(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, i3), list.get(i3));
        }
        dataWriter.write(new ArrayList());
        dataWriter.close();
        dataWriter.stop();
    }

    @Test
    public void testWriterFailure() throws Exception {
        HdfsSinkConnectorConfig hdfsSinkConnectorConfig = new HdfsSinkConnectorConfig(this.properties);
        Schema createSchema = createSchema();
        Struct createRecord = createRecord(createSchema);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, 0L));
        DataWriter dataWriter = new DataWriter(hdfsSinkConnectorConfig, this.context, this.avroData);
        dataWriter.write(arrayList);
        arrayList.clear();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 7) {
                break;
            }
            arrayList.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, j2));
            j = j2 + 1;
        }
        String str = "partition=" + String.valueOf(12);
        MemoryRecordWriter memoryRecordWriter = (MemoryRecordWriter) dataWriter.getWriters(TOPIC_PARTITION).get(str);
        memoryRecordWriter.setFailure(MemoryRecordWriter.Failure.writeFailure);
        dataWriter.write(arrayList);
        Assert.assertEquals(this.context.timeout(), hdfsSinkConnectorConfig.getLong("retry.backoff.ms").longValue());
        memoryRecordWriter.setFailure(MemoryRecordWriter.Failure.closeFailure);
        dataWriter.write(new ArrayList());
        Map<String, List<Object>> data = Data.getData();
        List<Object> list = data.get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get(str));
        Assert.assertEquals(1L, list.size());
        for (int i = 0; i < list.size(); i++) {
            Assert.assertEquals(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, i), list.get(i));
        }
        Thread.sleep(this.context.timeout());
        dataWriter.write(new ArrayList());
        List<Object> list2 = data.get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get(str));
        Assert.assertEquals(3L, list2.size());
        for (int i2 = 0; i2 < list2.size(); i2++) {
            Assert.assertEquals(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, i2), list2.get(i2));
        }
        dataWriter.write(new ArrayList());
        dataWriter.close();
        dataWriter.stop();
    }
}
