package io.confluent.connect.s3.integration;

import io.confluent.connect.s3.S3SinkConnector;
import io.confluent.connect.s3.S3SinkConnectorConfig;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.format.json.JsonFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.EmbeddedConnectUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.IntegrationTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/s3/integration/S3SinkConnectorIT.class */
public class S3SinkConnectorIT extends BaseConnectorIT {
    private static final String CONNECTOR_NAME = "s3-sink";
    private static final String DLQ_TOPIC_CONFIG = "errors.deadletterqueue.topic.name";
    private static final String DLQ_TOPIC_NAME = "DLQ-topic";
    private JsonConverter jsonConverter;
    private Producer<byte[], byte[]> producer;
    private static final Logger log = LoggerFactory.getLogger(S3SinkConnectorIT.class);
    private static final String DEFAULT_TEST_TOPIC_NAME = "TestTopic";
    private static final List<String> KAFKA_TOPICS = Collections.singletonList(DEFAULT_TEST_TOPIC_NAME);
    private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(10);

    @Before
    public void before() throws InterruptedException {
        initializeJsonConverter();
        initializeCustomProducer();
        setupProperties();
        waitForSchemaRegistryToStart();
        this.props.put("topics", String.join(",", KAFKA_TOPICS));
        this.props.put("flush.size", Integer.toString(3));
        this.props.put("format.class", AvroFormat.class.getName());
        this.props.put("storage.class", S3Storage.class.getName());
        this.props.put("s3.bucket.name", TEST_BUCKET_NAME);
        KAFKA_TOPICS.forEach(str -> {
            this.connect.kafka().createTopic(str, 1);
        });
    }

    @After
    public void after() throws Exception {
        FileUtils.deleteDirectory(new File("src/test/resources/downloaded-files/"));
        clearBucket(TEST_BUCKET_NAME);
        waitForFilesInBucket(TEST_BUCKET_NAME, 0);
    }

    @Test
    public void testBasicRecordsWrittenAvro() throws Throwable {
        this.props.put("format.class", AvroFormat.class.getName());
        testBasicRecordsWritten("avro", false);
    }

    @Test
    public void testBasicRecordsWrittenParquet() throws Throwable {
        this.props.put("format.class", ParquetFormat.class.getName());
        testBasicRecordsWritten("snappy.parquet", false);
    }

    @Test
    public void testBasicRecordsWrittenJson() throws Throwable {
        this.props.put("format.class", JsonFormat.class.getName());
        testBasicRecordsWritten("json", false);
    }

    @Test
    public void testFilesWrittenToBucketAvroWithExtInTopic() throws Throwable {
        this.props.put("format.class", AvroFormat.class.getName());
        testBasicRecordsWritten("avro", true);
    }

    @Test
    public void testFilesWrittenToBucketParquetWithExtInTopic() throws Throwable {
        this.props.put("format.class", ParquetFormat.class.getName());
        testBasicRecordsWritten("snappy.parquet", true);
    }

    @Test
    public void testFilesWrittenToBucketJsonWithExtInTopic() throws Throwable {
        this.props.put("format.class", JsonFormat.class.getName());
        testBasicRecordsWritten("json", true);
    }

    private void testBasicRecordsWritten(String str, boolean z) throws Throwable {
        String str2 = "other." + str + ".topic." + str;
        TreeSet treeSet = new TreeSet(KAFKA_TOPICS);
        if (z) {
            treeSet.add(str2);
            this.connect.kafka().createTopic(str2, 1);
            this.props.replace("topics", this.props.get("topics") + "," + str2);
        }
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        EmbeddedConnectUtils.waitForConnectorToStart(this.connect, CONNECTOR_NAME, Math.min(treeSet.size(), 3));
        Schema sampleStructSchema = getSampleStructSchema();
        Struct sampleStructVal = getSampleStructVal(sampleStructSchema);
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            produceRecordsNoHeaders(30, getSampleTopicRecord((String) it.next(), sampleStructSchema, sampleStructVal));
        }
        log.info("Waiting for files in S3...");
        int size = 10 * treeSet.size();
        waitForFilesInBucket(TEST_BUCKET_NAME, size);
        TreeSet treeSet2 = new TreeSet();
        Iterator it2 = treeSet.iterator();
        while (it2.hasNext()) {
            List<String> expectedFilenames = getExpectedFilenames((String) it2.next(), 0, 3, 30L, str);
            Assert.assertEquals(expectedFilenames.size(), 10);
            treeSet2.addAll(expectedFilenames);
        }
        Assert.assertEquals(treeSet2.size(), size);
        assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList(treeSet2));
        Assert.assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, 3, sampleStructVal));
    }

    @Test
    public void testFaultyRecordsReportedToDLQ() throws Throwable {
        this.props.put("key.converter", StringConverter.class.getName());
        this.props.put("behavior.on.null.values", S3SinkConnectorConfig.BehaviorOnNullValues.IGNORE.toString());
        this.props.put("store.kafka.keys", "true");
        this.props.put("store.kafka.headers", "true");
        this.props.put(DLQ_TOPIC_CONFIG, DLQ_TOPIC_NAME);
        this.props.put("errors.deadletterqueue.context.headers.enable", "true");
        this.props.put("errors.tolerance", "all");
        this.props.put("errors.deadletterqueue.topic.replication.factor", "1");
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        EmbeddedConnectUtils.waitForConnectorToStart(this.connect, CONNECTOR_NAME, Math.min(KAFKA_TOPICS.size(), 3));
        Schema sampleStructSchema = getSampleStructSchema();
        Struct sampleStructVal = getSampleStructVal(sampleStructSchema);
        SinkRecord sampleRecord = getSampleRecord(sampleStructSchema, sampleStructVal, DEFAULT_TEST_TOPIC_NAME);
        produceRecordsWithHeaders(DEFAULT_TEST_TOPIC_NAME, 30, sampleRecord);
        waitForFilesInBucket(TEST_BUCKET_NAME, 30);
        produceRecordsWithHeadersNoKey(DEFAULT_TEST_TOPIC_NAME, 1, sampleRecord);
        produceRecordsWithHeadersNoValue(DEFAULT_TEST_TOPIC_NAME, 1, sampleRecord);
        produceRecordsNoHeaders(1, sampleRecord);
        produceRecordsWithHeaders(DEFAULT_TEST_TOPIC_NAME, 30, sampleRecord);
        log.info("Waiting for files in S3...");
        waitForFilesInBucket(TEST_BUCKET_NAME, 30 * 2);
        ConsumerRecords<byte[], byte[]> consume = this.connect.kafka().consume(3, CONSUME_MAX_DURATION_MS, new String[]{DLQ_TOPIC_NAME});
        List<String> asList = Arrays.asList("Key cannot be null for SinkRecord", "Skipping null value record", "Headers cannot be null for SinkRecord");
        Assert.assertEquals(3, consume.count());
        assertDLQRecordMessages(asList, consume);
        Assert.assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, 3, sampleStructVal));
    }

    private void assertDLQRecordMessages(List<String> list, ConsumerRecords<byte[], byte[]> consumerRecords) {
        ArrayList arrayList = new ArrayList();
        Iterator it = consumerRecords.records(DLQ_TOPIC_NAME).iterator();
        while (it.hasNext()) {
            arrayList.add(new StringDeserializer().deserialize(DLQ_TOPIC_NAME, ((Header) ((ConsumerRecord) it.next()).headers().headers("__connect.errors.exception.message").iterator().next()).value()));
        }
        Collections.sort(arrayList);
        Collections.sort(list);
        for (int i = 0; i < list.size(); i++) {
            MatcherAssert.assertThat((String) arrayList.get(i), StringStartsWith.startsWith(list.get(i)));
        }
    }

    private void produceRecordsNoHeaders(int i, SinkRecord sinkRecord) throws ExecutionException, InterruptedException {
        produceRecords(sinkRecord.topic(), i, sinkRecord, true, true, false);
    }

    private void produceRecordsWithHeaders(String str, int i, SinkRecord sinkRecord) throws Exception {
        produceRecords(str, i, sinkRecord, true, true, true);
    }

    private void produceRecordsWithHeadersNoKey(String str, int i, SinkRecord sinkRecord) throws Exception {
        produceRecords(str, i, sinkRecord, false, true, true);
    }

    private void produceRecordsWithHeadersNoValue(String str, int i, SinkRecord sinkRecord) throws Exception {
        produceRecords(str, i, sinkRecord, true, false, true);
    }

    private void produceRecords(String str, int i, SinkRecord sinkRecord, boolean z, boolean z2, boolean z3) throws ExecutionException, InterruptedException {
        byte[] bArr = null;
        byte[] bArr2 = null;
        Iterable<Header> emptyList = Collections.emptyList();
        if (z) {
            bArr = this.jsonConverter.fromConnectData(str, Schema.STRING_SCHEMA, sinkRecord.key());
        }
        if (z2) {
            bArr2 = this.jsonConverter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value());
        }
        if (z3) {
            emptyList = sampleHeaders();
        }
        ProducerRecord producerRecord = new ProducerRecord(str, 0, bArr, bArr2, emptyList);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= i) {
                return;
            }
            this.producer.send(producerRecord).get();
            j = j2 + 1;
        }
    }

    private void initializeJsonConverter() {
        HashMap hashMap = new HashMap();
        hashMap.put("schemas.enable", "true");
        hashMap.put("converter.type", "value");
        this.jsonConverter = new JsonConverter();
        this.jsonConverter.configure(hashMap);
    }

    private void initializeCustomProducer() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.connect.kafka().bootstrapServers());
        hashMap.put("key.serializer", ByteArraySerializer.class.getName());
        hashMap.put("value.serializer", ByteArraySerializer.class.getName());
        this.producer = new KafkaProducer(hashMap);
    }

    private void setupProperties() {
        this.props = new HashMap();
        this.props.put("connector.class", S3SinkConnector.class.getName());
        this.props.put("tasks.max", Integer.toString(3));
        this.props.put("key.converter", JsonConverter.class.getName());
        this.props.put("value.converter", JsonConverter.class.getName());
        this.props.putAll(getAWSCredentialFromPath());
    }
}
