package io.confluent.connect.s3.integration;

import io.confluent.connect.avro.AvroConverter;
import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.connect.s3.S3SinkConnector;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.EmbeddedConnectUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/s3/integration/S3SinkDataFormatIT.class */
public class S3SinkDataFormatIT extends BaseConnectorIT {
    private static final Logger log = LoggerFactory.getLogger(S3SinkDataFormatIT.class);
    private static final String KEY_CONVERTER_SCHEMA_REGISTRY_URL = "key.converter.schema.registry.url";
    private static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
    private static final String KEY_CONVERTER_SCRUB_INVALID_NAMES = "key.converter.scrub.invalid.names";
    private static final String VALUE_CONVERTER_SCRUB_INVALID_NAMES = "value.converter.scrub.invalid.names";
    private static final String VALUE_CONVERTER_SCHEMAS_ENABLE = "value.converter.schemas.enable";
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final Class<? extends Converter> converterClass;
    private final String connectorName;
    private final String topicName;

    public S3SinkDataFormatIT(Class<? extends Converter> cls) throws Exception {
        this.converterClass = cls;
        this.keyConverter = this.converterClass.getConstructor(new Class[0]).newInstance(new Object[0]);
        this.valueConverter = this.converterClass.getConstructor(new Class[0]).newInstance(new Object[0]);
        this.connectorName = "s3-sink-" + cls.getSimpleName();
        this.topicName = "TestTopic" + cls.getSimpleName();
    }

    @Parameterized.Parameters
    public static List<Class<? extends Converter>> data() {
        return Arrays.asList(JsonSchemaConverter.class, ProtobufConverter.class, AvroConverter.class);
    }

    @Before
    public void before() throws InterruptedException {
        this.keyConverter.configure(ImmutableMap.of("schema.registry.url", this.restApp.restServer.getURI().toString()), true);
        this.valueConverter.configure(ImmutableMap.of("schema.registry.url", this.restApp.restServer.getURI().toString()), false);
        setupProperties();
        this.props.put("topics", String.join(",", this.topicName));
        this.props.put("flush.size", Integer.toString(3));
        this.props.put("format.class", AvroFormat.class.getName());
        this.props.put("storage.class", S3Storage.class.getName());
        this.props.put("s3.bucket.name", TEST_BUCKET_NAME);
        this.props.put("key.converter", this.converterClass.getSimpleName());
        this.props.put(KEY_CONVERTER_SCHEMA_REGISTRY_URL, this.restApp.restServer.getURI().toString());
        this.props.put(KEY_CONVERTER_SCRUB_INVALID_NAMES, "true");
        this.props.put("value.converter", this.converterClass.getSimpleName());
        this.props.put(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, this.restApp.restServer.getURI().toString());
        this.props.put(VALUE_CONVERTER_SCRUB_INVALID_NAMES, "true");
        this.connect.kafka().createTopic(this.topicName, 1);
    }

    @After
    public void after() throws Exception {
        FileUtils.deleteDirectory(new File("src/test/resources/downloaded-files/"));
        clearBucket(TEST_BUCKET_NAME);
        waitForFilesInBucket(TEST_BUCKET_NAME, 0);
    }

    @Test
    public void testBasicRecordsWrittenAvro() throws Throwable {
        this.props.put("format.class", AvroFormat.class.getName());
        TreeSet<String> treeSet = new TreeSet(Collections.singletonList(this.topicName));
        this.connect.kafka().createTopic("other.avro.topic.avro", 1);
        this.connect.configureConnector(this.connectorName, this.props);
        EmbeddedConnectUtils.waitForConnectorToStart(this.connect, this.connectorName, Math.min(treeSet.size(), 3));
        Schema sampleStructSchema = getSampleStructSchema();
        Struct sampleStructVal = getSampleStructVal(sampleStructSchema);
        KafkaProducer<byte[], byte[]> configureProducer = configureProducer();
        ArrayList arrayList = new ArrayList();
        for (String str : treeSet) {
            for (int i = 0; i < 30; i++) {
                arrayList.add(getSampleTopicRecord(str, sampleStructSchema, sampleStructVal));
            }
        }
        produceRecords(configureProducer, this.keyConverter, this.valueConverter, arrayList, this.topicName);
        log.info("Waiting for files in S3...");
        int size = 10 * treeSet.size();
        waitForFilesInBucket(TEST_BUCKET_NAME, size);
        TreeSet treeSet2 = new TreeSet();
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            List<String> expectedFilenames = getExpectedFilenames((String) it.next(), 0, 3, 30L, "avro");
            Assert.assertEquals(expectedFilenames.size(), 10);
            treeSet2.addAll(expectedFilenames);
        }
        Assert.assertEquals(treeSet2.size(), size);
        assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList(treeSet2));
        Assert.assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, 3, sampleStructVal));
    }

    protected void produceRecords(KafkaProducer<byte[], byte[]> kafkaProducer, Converter converter, Converter converter2, List<SinkRecord> list, String str) {
        for (int i = 0; i < list.size(); i++) {
            byte[] fromConnectData = converter.fromConnectData(str, Schema.STRING_SCHEMA, String.valueOf(i));
            SinkRecord sinkRecord = list.get(i);
            ProducerRecord producerRecord = new ProducerRecord(str, 0, fromConnectData, converter2.fromConnectData(str, sinkRecord.valueSchema(), sinkRecord.value()));
            try {
                kafkaProducer.send(producerRecord).get(TimeUnit.SECONDS.toMillis(120L), TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                throw new KafkaException("Could not produce message: " + producerRecord, e);
            }
        }
    }

    protected KafkaProducer<byte[], byte[]> configureProducer() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.connect.kafka().bootstrapServers());
        return new KafkaProducer<>(hashMap, new ByteArraySerializer(), new ByteArraySerializer());
    }

    private void setupProperties() {
        this.props = new HashMap();
        this.props.put("connector.class", S3SinkConnector.class.getName());
        this.props.put("tasks.max", Integer.toString(3));
        this.props.putAll(getAWSCredentialFromPath());
    }
}
