package io.confluent.connect.s3.integration;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.confluent.connect.s3.S3SinkConnector;
import io.confluent.connect.s3.S3SinkConnectorConfig;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.format.json.JsonFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;
import io.confluent.connect.s3.storage.S3Storage;
import io.findify.s3mock.S3Mock;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.IntegrationTest;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.tools.json.JsonRecordFormatter;
import org.apache.parquet.tools.read.SimpleReadSupport;
import org.apache.parquet.tools.read.SimpleRecord;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/s3/integration/S3SinkConnectorIT.class */
public class S3SinkConnectorIT extends BaseConnectorIT {
    private static final String JENKINS_HOME = "JENKINS_HOME";
    private static final String AWS_REGION = "us-west-2";
    private static final String MOCK_S3_URL = "http://localhost:8001";
    private static final int MOCK_S3_PORT = 8001;
    private static final String TEST_RESOURCES_PATH = "src/test/resources/";
    private static final String TEST_DOWNLOAD_PATH = "src/test/resources/downloaded-files/";
    private static final String CONNECTOR_NAME = "s3-sink";
    private static final String STORAGE_CLASS_CONFIG = "storage.class";
    private static final String DLQ_TOPIC_CONFIG = "errors.deadletterqueue.topic.name";
    private static final String DLQ_TOPIC_NAME = "DLQ-topic";
    private static final int NUM_RECORDS_INSERT = 30;
    private static final int FLUSH_SIZE_STANDARD = 3;
    private static final int TOPIC_PARTITION = 0;
    private static final int DEFAULT_OFFSET = 0;
    private JsonConverter jsonConverter;
    private Producer<byte[], byte[]> producer;
    private static final Logger log = LoggerFactory.getLogger(S3SinkConnectorIT.class);
    private static final ObjectMapper jsonMapper = new ObjectMapper();
    private static final String AWS_CRED_PATH = System.getProperty("user.home") + "/.aws/credentials";
    private static final String TEST_TOPIC_NAME = "TestTopic";
    private static final List<String> KAFKA_TOPICS = Collections.singletonList(TEST_TOPIC_NAME);
    private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(10);
    private static final String JSON_EXTENSION = "json";
    private static final String AVRO_EXTENSION = "avro";
    private static final String PARQUET_EXTENSION = "snappy.parquet";
    private static final Map<String, Function<String, List<JsonNode>>> contentGetters = ImmutableMap.of(JSON_EXTENSION, S3SinkConnectorIT::getContentsFromJson, AVRO_EXTENSION, S3SinkConnectorIT::getContentsFromAvro, PARQUET_EXTENSION, S3SinkConnectorIT::getContentsFromParquet);

    protected static boolean useMockClient() {
        return (System.getenv(JENKINS_HOME) == null && new File(AWS_CRED_PATH).exists()) ? false : true;
    }

    @BeforeClass
    public static void setupClient() {
        S3Client = getS3Client();
        if (S3Client.doesBucketExistV2(TEST_BUCKET_NAME)) {
            clearBucket(TEST_BUCKET_NAME);
        } else {
            S3Client.createBucket(TEST_BUCKET_NAME);
        }
    }

    @AfterClass
    public static void deleteBucket() {
        S3Client.deleteBucket(TEST_BUCKET_NAME);
    }

    @Before
    public void before() {
        initializeJsonConverter();
        initializeCustomProducer();
        setupProperties();
        this.props.put("topics", String.join(",", KAFKA_TOPICS));
        this.props.put("flush.size", Integer.toString(FLUSH_SIZE_STANDARD));
        this.props.put("format.class", AvroFormat.class.getName());
        this.props.put(STORAGE_CLASS_CONFIG, S3Storage.class.getName());
        this.props.put("s3.bucket.name", TEST_BUCKET_NAME);
        if (useMockClient()) {
            this.props.put("store.url", MOCK_S3_URL);
        }
        KAFKA_TOPICS.forEach(str -> {
            this.connect.kafka().createTopic(str, 1);
        });
    }

    @After
    public void after() throws Exception {
        FileUtils.deleteDirectory(new File(TEST_DOWNLOAD_PATH));
        clearBucket(TEST_BUCKET_NAME);
        waitForFilesInBucket(TEST_BUCKET_NAME, 0);
    }

    @Test
    public void testFilesWrittenToBucketAvro() throws Throwable {
        this.props.put("format.class", AvroFormat.class.getName());
        testBasicRecordsWritten(AVRO_EXTENSION);
    }

    @Test
    public void testFilesWrittenToBucketParquet() throws Throwable {
        this.props.put("format.class", ParquetFormat.class.getName());
        testBasicRecordsWritten(PARQUET_EXTENSION);
    }

    @Test
    public void testFilesWrittenToBucketJson() throws Throwable {
        this.props.put("format.class", JsonFormat.class.getName());
        testBasicRecordsWritten(JSON_EXTENSION);
    }

    private void testBasicRecordsWritten(String str) throws Throwable {
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, Math.min(KAFKA_TOPICS.size(), FLUSH_SIZE_STANDARD));
        Schema sampleStructSchema = getSampleStructSchema();
        Struct sampleStructVal = getSampleStructVal(sampleStructSchema);
        produceRecordsNoHeaders(30, getSampleRecord(sampleStructSchema, sampleStructVal));
        log.info("Waiting for files in S3...");
        waitForFilesInBucket(TEST_BUCKET_NAME, 10);
        Assert.assertTrue(fileNamesValid(TEST_BUCKET_NAME, getExpectedFilenames(TEST_TOPIC_NAME, 0, FLUSH_SIZE_STANDARD, 30L, str)));
        Assert.assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, sampleStructVal));
    }

    @Test
    public void testFaultyRecordsReportedToDLQ() throws Throwable {
        this.props.put("key.converter", StringConverter.class.getName());
        this.props.put("behavior.on.null.values", S3SinkConnectorConfig.BehaviorOnNullValues.IGNORE.toString());
        this.props.put("store.kafka.keys", "true");
        this.props.put("store.kafka.headers", "true");
        this.props.put(DLQ_TOPIC_CONFIG, DLQ_TOPIC_NAME);
        this.props.put("errors.deadletterqueue.context.headers.enable", "true");
        this.props.put("errors.tolerance", "all");
        this.props.put("errors.deadletterqueue.topic.replication.factor", "1");
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, Math.min(KAFKA_TOPICS.size(), FLUSH_SIZE_STANDARD));
        Schema sampleStructSchema = getSampleStructSchema();
        Struct sampleStructVal = getSampleStructVal(sampleStructSchema);
        SinkRecord sampleRecord = getSampleRecord(sampleStructSchema, sampleStructVal);
        produceRecordsWithHeaders(TEST_TOPIC_NAME, 30, sampleRecord);
        waitForFilesInBucket(TEST_BUCKET_NAME, 30);
        produceRecordsWithHeadersNoKey(TEST_TOPIC_NAME, 1, sampleRecord);
        produceRecordsWithHeadersNoValue(TEST_TOPIC_NAME, 1, sampleRecord);
        produceRecordsNoHeaders(1, sampleRecord);
        produceRecordsWithHeaders(TEST_TOPIC_NAME, 30, sampleRecord);
        log.info("Waiting for files in S3...");
        waitForFilesInBucket(TEST_BUCKET_NAME, 30 * 2);
        ConsumerRecords<byte[], byte[]> consume = this.connect.kafka().consume(FLUSH_SIZE_STANDARD, CONSUME_MAX_DURATION_MS, new String[]{DLQ_TOPIC_NAME});
        List<String> asList = Arrays.asList("Key cannot be null for SinkRecord", "Skipping null value record", "Headers cannot be null for SinkRecord");
        Assert.assertEquals(FLUSH_SIZE_STANDARD, consume.count());
        assertDLQRecordMessages(asList, consume);
        Assert.assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, sampleStructVal));
    }

    private void assertDLQRecordMessages(List<String> list, ConsumerRecords<byte[], byte[]> consumerRecords) {
        ArrayList arrayList = new ArrayList();
        Iterator it = consumerRecords.records(DLQ_TOPIC_NAME).iterator();
        while (it.hasNext()) {
            arrayList.add(new StringDeserializer().deserialize(DLQ_TOPIC_NAME, ((Header) ((ConsumerRecord) it.next()).headers().headers("__connect.errors.exception.message").iterator().next()).value()));
        }
        Collections.sort(arrayList);
        Collections.sort(list);
        for (int i = 0; i < list.size(); i++) {
            MatcherAssert.assertThat((String) arrayList.get(i), StringStartsWith.startsWith(list.get(i)));
        }
    }

    private void produceRecordsNoHeaders(int i, SinkRecord sinkRecord) throws ExecutionException, InterruptedException {
        produceRecords(sinkRecord.topic(), i, sinkRecord, true, true, false);
    }

    private void produceRecordsWithHeaders(String str, int i, SinkRecord sinkRecord) throws Exception {
        produceRecords(str, i, sinkRecord, true, true, true);
    }

    private void produceRecordsWithHeadersNoKey(String str, int i, SinkRecord sinkRecord) throws Exception {
        produceRecords(str, i, sinkRecord, false, true, true);
    }

    private void produceRecordsWithHeadersNoValue(String str, int i, SinkRecord sinkRecord) throws Exception {
        produceRecords(str, i, sinkRecord, true, false, true);
    }

    private void produceRecords(String str, int i, SinkRecord sinkRecord, boolean z, boolean z2, boolean z3) throws ExecutionException, InterruptedException {
        byte[] bArr = null;
        byte[] bArr2 = null;
        Iterable<Header> emptyList = Collections.emptyList();
        if (z) {
            bArr = this.jsonConverter.fromConnectData(str, Schema.STRING_SCHEMA, sinkRecord.key());
        }
        if (z2) {
            bArr2 = this.jsonConverter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value());
        }
        if (z3) {
            emptyList = sampleHeaders();
        }
        ProducerRecord producerRecord = new ProducerRecord(str, 0, bArr, bArr2, emptyList);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= i) {
                return;
            }
            this.producer.send(producerRecord).get();
            j = j2 + 1;
        }
    }

    private SinkRecord getSampleRecord(Schema schema, Struct struct) {
        return new SinkRecord(TEST_TOPIC_NAME, 0, Schema.STRING_SCHEMA, "key", schema, struct, 0L);
    }

    private Iterable<Header> sampleHeaders() {
        return Arrays.asList(new RecordHeader("first-header-key", "first-header-value".getBytes()), new RecordHeader("second-header-key", "second-header-value".getBytes()));
    }

    private Schema getSampleStructSchema() {
        return SchemaBuilder.struct().field("ID", Schema.INT64_SCHEMA).field("myBool", Schema.BOOLEAN_SCHEMA).field("myInt32", Schema.INT32_SCHEMA).field("myFloat32", Schema.FLOAT32_SCHEMA).field("myFloat64", Schema.FLOAT64_SCHEMA).field("myString", Schema.STRING_SCHEMA).build();
    }

    private Struct getSampleStructVal(Schema schema) {
        new Date(1111111L).setTime(0L);
        return new Struct(schema).put("ID", 1L).put("myBool", true).put("myInt32", 32).put("myFloat32", Float.valueOf(3.2f)).put("myFloat64", Double.valueOf(64.64d)).put("myString", "theStringVal");
    }

    private static AmazonS3 getS3Client() {
        if (!useMockClient()) {
            log.info("Credentials found, using real S3 client.");
            return (AmazonS3) AmazonS3ClientBuilder.standard().withRegion(AWS_REGION).build();
        }
        new S3Mock.Builder().withPort(MOCK_S3_PORT).withInMemoryBackend().build().start();
        AmazonS3 amazonS3 = (AmazonS3) AmazonS3ClientBuilder.standard().withPathStyleAccessEnabled(true).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(MOCK_S3_URL, AWS_REGION)).withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials())).build();
        log.info("No credentials found, using mock S3 client.");
        return amazonS3;
    }

    private static void clearBucket(String str) {
        Iterator it = S3Client.listObjectsV2(str).getObjectSummaries().iterator();
        while (it.hasNext()) {
            S3Client.deleteObject(str, ((S3ObjectSummary) it.next()).getKey());
        }
    }

    private boolean fileContentsAsExpected(String str, int i, Struct struct) {
        log.info("expectedRow: {}", struct);
        for (String str2 : getS3FileListValues(S3Client.listObjectsV2(str).getObjectSummaries())) {
            String str3 = TEST_DOWNLOAD_PATH + str2;
            File file = new File(str3);
            log.info("Saving file to : {}", str3);
            S3Client.getObject(new GetObjectRequest(str, str2), file);
            if (!fileContentsMatchExpected(contentGetters.get(getExtensionFromKey(str2)).apply(str3), i, struct)) {
                return false;
            }
            file.delete();
        }
        return true;
    }

    private boolean fileContentsMatchExpected(List<JsonNode> list, int i, Struct struct) {
        if (list.size() != i) {
            log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}", Integer.valueOf(list.size()), Integer.valueOf(i));
            return false;
        }
        Iterator<JsonNode> it = list.iterator();
        while (it.hasNext()) {
            if (!fileRowMatchesExpectedRow(it.next(), struct)) {
                return false;
            }
        }
        return true;
    }

    private boolean fileRowMatchesExpectedRow(JsonNode jsonNode, Struct struct) {
        log.debug("Comparing rows: file: {}, expected: {}", jsonNode, struct);
        for (Field field : struct.schema().fields()) {
            String obj = struct.get(field).toString();
            String replaceAll = jsonNode.get(field.name()).toString().replaceAll("^\"|\"$", "");
            log.debug("Comparing values: {}, {}", obj, replaceAll);
            if (!replaceAll.equals(obj)) {
                return false;
            }
        }
        return true;
    }

    private static List<JsonNode> getContentsFromAvro(String str) {
        try {
            DataFileReader dataFileReader = new DataFileReader(new File(str), new GenericDatumReader());
            ArrayList arrayList = new ArrayList();
            while (dataFileReader.hasNext()) {
                arrayList.add(jsonMapper.readTree(((GenericRecord) dataFileReader.next()).toString()));
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<JsonNode> getContentsFromParquet(String str) {
        try {
            ParquetReader build = ParquetReader.builder(new SimpleReadSupport(), new Path(str)).build();
            JsonRecordFormatter.JsonGroupFormatter fromSchema = JsonRecordFormatter.fromSchema(ParquetFileReader.readFooter(new Configuration(), new Path(str)).getFileMetaData().getSchema());
            ArrayList arrayList = new ArrayList();
            for (SimpleRecord simpleRecord = (SimpleRecord) build.read(); simpleRecord != null; simpleRecord = (SimpleRecord) build.read()) {
                arrayList.add(jsonMapper.readTree(fromSchema.formatRecord(simpleRecord)));
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<JsonNode> getContentsFromJson(String str) {
        try {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(str)));
            ArrayList arrayList = new ArrayList();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    return arrayList;
                }
                arrayList.add(jsonMapper.readTree(readLine));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private String getExtensionFromKey(String str) {
        String[] split = str.split("\\.", 2);
        if (split.length < 2) {
            throw new RuntimeException("Could not parse extension from filename.");
        }
        return split[1];
    }

    private void initializeJsonConverter() {
        HashMap hashMap = new HashMap();
        hashMap.put("schemas.enable", "true");
        hashMap.put("converter.type", "value");
        this.jsonConverter = new JsonConverter();
        this.jsonConverter.configure(hashMap);
    }

    private boolean filenameContainsExtensions(String str, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (str.endsWith(it.next())) {
                return true;
            }
        }
        return false;
    }

    private List<String> getS3FileListValues(List<S3ObjectSummary> list) {
        List asList = Arrays.asList(".headers.avro", ".keys.avro");
        return (List) list.stream().filter(s3ObjectSummary -> {
            return !filenameContainsExtensions(s3ObjectSummary.getKey(), asList);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private void initializeCustomProducer() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.connect.kafka().bootstrapServers());
        hashMap.put("key.serializer", ByteArraySerializer.class.getName());
        hashMap.put("value.serializer", ByteArraySerializer.class.getName());
        this.producer = new KafkaProducer(hashMap);
    }

    private void setupProperties() {
        this.props = new HashMap();
        this.props.put("connector.class", S3SinkConnector.class.getName());
        this.props.put("tasks.max", Integer.toString(FLUSH_SIZE_STANDARD));
        this.props.put("key.converter", JsonConverter.class.getName());
        this.props.put("value.converter", JsonConverter.class.getName());
    }
}
