package io.confluent.connect.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.FileUtils;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.format.RecordWriter;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.Partitioner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:io/confluent/connect/s3/DataWriterTestBase.class */
public abstract class DataWriterTestBase<FORMAT extends Format<S3SinkConnectorConfig, String>> extends TestWithMockedS3 {
    protected static final String ZERO_PAD_FMT = "%010d";
    protected S3Storage storage;
    protected AmazonS3 s3;
    protected Partitioner<?> partitioner;
    protected S3SinkTask task;
    protected Map<String, String> localProps = new HashMap();
    protected FORMAT format;
    private final Class<FORMAT> clazz;

    protected abstract String getFileExtension();

    protected abstract void verify(List<SinkRecord> list, long[] jArr, Set<TopicPartition> set) throws IOException;

    protected abstract List<SinkRecord> createGenericRecords(int i, long j);

    protected abstract String getDirectory(String str, int i);

    /* JADX INFO: Access modifiers changed from: protected */
    public DataWriterTestBase(Class<FORMAT> cls) {
        this.clazz = cls;
    }

    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    public void setUp() throws Exception {
        this.localProps.put("format.class", this.clazz.getName());
        super.setUp();
        this.s3 = (AmazonS3) PowerMockito.spy(newS3Client(this.connectorConfig));
        this.storage = new S3Storage(this.connectorConfig, this.url, "kafka.bucket", this.s3);
        this.partitioner = new DefaultPartitioner();
        this.partitioner.configure(this.parsedConfig);
        this.format = this.clazz.getDeclaredConstructor(S3Storage.class).newInstance(this.storage);
        Assert.assertEquals(this.format.getClass().getName(), this.clazz.getName());
        this.s3.createBucket("kafka.bucket");
        Assert.assertTrue(this.s3.doesBucketExistV2("kafka.bucket"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getExpectedFiles(long[] jArr, TopicPartition topicPartition, String str) {
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i < jArr.length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, getDirectory(topicPartition.topic(), topicPartition.partition()), topicPartition, jArr[i - 1], str, ZERO_PAD_FMT));
        }
        return arrayList;
    }

    protected List<String> getExpectedFiles(long[] jArr, Collection<TopicPartition> collection, String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.addAll(getExpectedFiles(jArr, it.next(), str));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyFileListing(List<String> list) throws IOException {
        List<S3ObjectSummary> listObjects = listObjects("kafka.bucket", null, this.s3);
        ArrayList arrayList = new ArrayList();
        Iterator<S3ObjectSummary> it = listObjects.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKey());
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyFileListing(long[] jArr, Collection<TopicPartition> collection, String str) throws IOException {
        verifyFileListing(getExpectedFiles(jArr, collection, str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testCorrectRecordWriterHelper(String str) throws Exception {
        this.localProps.put("topics.dir", str);
        setUp();
        this.task = new S3SinkTask(this.connectorConfig, this.context, this.storage, this.partitioner, this.format, SYSTEM_TIME);
        KeyValueHeaderRecordWriterProvider newRecordWriterProvider = this.task.newRecordWriterProvider(this.connectorConfig);
        Assert.assertEquals(newRecordWriterProvider.getExtension(), getFileExtension());
        Assertions.assertThat(newRecordWriterProvider).isInstanceOf(KeyValueHeaderRecordWriterProvider.class);
        KeyValueHeaderRecordWriterProvider keyValueHeaderRecordWriterProvider = newRecordWriterProvider;
        Set<TopicPartition> hashSet = new HashSet<>();
        List<SinkRecord> createGenericRecords = createGenericRecords(1, 1L);
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (SinkRecord sinkRecord : createGenericRecords) {
            TopicPartition topicPartition = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
            hashSet.add(topicPartition);
            Long l = (Long) hashMap.getOrDefault(Integer.valueOf(topicPartition.partition()), 0L);
            hashMap.put(Integer.valueOf(topicPartition.partition()), Long.valueOf(l.longValue() + 1));
            String fileKeyToCommit = FileUtils.fileKeyToCommit(this.topicsDir, getDirectory(topicPartition.topic(), topicPartition.partition()), topicPartition, l.longValue(), getFileExtension(), ZERO_PAD_FMT);
            arrayList.add(fileKeyToCommit);
            RecordWriter recordWriter = keyValueHeaderRecordWriterProvider.getRecordWriter(this.connectorConfig, fileKeyToCommit);
            recordWriter.write(sinkRecord);
            recordWriter.commit();
            recordWriter.close();
        }
        long[] jArr = new long[2];
        for (int i = 0; i <= 1; i++) {
            jArr[i] = i;
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(getExpectedFiles(jArr, hashSet, getFileExtension()));
        verify(createGenericRecords, jArr, hashSet);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Collection<TopicPartition> sortedPartitions(Collection<TopicPartition> collection) {
        TreeMap treeMap = new TreeMap();
        for (TopicPartition topicPartition : collection) {
            if (treeMap.containsKey(Integer.valueOf(topicPartition.partition()))) {
                throw new RuntimeException("A duplicate partition number not expected.");
            }
            treeMap.put(Integer.valueOf(topicPartition.partition()), topicPartition);
        }
        return treeMap.values();
    }
}
