package io.confluent.connect.s3.integration;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.s3.util.S3Utils;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.tools.json.JsonRecordFormatter;
import org.apache.parquet.tools.read.SimpleReadSupport;
import org.apache.parquet.tools.read.SimpleRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/s3/integration/BaseConnectorIT.class */
public abstract class BaseConnectorIT {
    protected static final int MAX_TASKS = 3;
    protected RestApp restApp;
    protected static final int TOPIC_PARTITION = 0;
    protected static final int DEFAULT_OFFSET = 0;
    protected static final int NUM_RECORDS_INSERT = 30;
    protected static final int FLUSH_SIZE_STANDARD = 3;
    protected static AmazonS3 S3Client;
    protected static final String AWS_REGION = "us-west-2";
    protected static final String AWS_CREDENTIALS_PATH = "AWS_CREDENTIALS_PATH";
    protected static final String TEST_RESOURCES_PATH = "src/test/resources/";
    protected static final String TEST_DOWNLOAD_PATH = "src/test/resources/downloaded-files/";
    protected static final String STORAGE_CLASS_CONFIG = "storage.class";
    protected EmbeddedConnectCluster connect;
    protected Map<String, String> props;
    private static final Logger log = LoggerFactory.getLogger(BaseConnectorIT.class);
    private static final long S3_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
    protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.MINUTES.toMillis(60);
    protected static final ObjectMapper jsonMapper = new ObjectMapper();
    protected static final String JSON_EXTENSION = "json";
    protected static final String AVRO_EXTENSION = "avro";
    protected static final String PARQUET_EXTENSION = "snappy.parquet";
    private static final Map<String, Function<String, List<JsonNode>>> contentGetters = ImmutableMap.of(JSON_EXTENSION, BaseConnectorIT::getContentsFromJson, AVRO_EXTENSION, BaseConnectorIT::getContentsFromAvro, PARQUET_EXTENSION, BaseConnectorIT::getContentsFromParquet);
    protected static final String TEST_BUCKET_NAME = "connect-s3-integration-testing-" + System.currentTimeMillis();

    @BeforeClass
    public static void setupClient() {
        log.info("Starting ITs...");
        S3Client = getS3Client();
        if (S3Client.doesBucketExistV2(TEST_BUCKET_NAME)) {
            clearBucket(TEST_BUCKET_NAME);
        } else {
            S3Client.createBucket(TEST_BUCKET_NAME);
        }
    }

    @AfterClass
    public static void deleteBucket() {
        S3Client.deleteBucket(TEST_BUCKET_NAME);
        log.info("Finished ITs, removed S3 bucket");
    }

    @Before
    public void setup() throws Exception {
        startConnect();
        startSchemaRegistry();
    }

    @After
    public void close() throws Exception {
        stopSchemaRegistry();
        this.connect.stop();
    }

    protected void startConnect() {
        HashMap hashMap = new HashMap();
        hashMap.put("plugin.discovery", "hybrid_warn");
        this.connect = new EmbeddedConnectCluster.Builder().name("s3-connect-cluster").workerProps(hashMap).build();
        this.connect.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long waitForFilesInBucket(String str, int i) throws InterruptedException {
        return S3Utils.waitForFilesInBucket(S3Client, str, i, S3_TIMEOUT_MS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getExpectedFilenames(String str, int i, int i2, long j, String str2) {
        int i3 = ((int) j) / i2;
        ArrayList arrayList = new ArrayList();
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i5 >= i3 * i2) {
                return arrayList;
            }
            arrayList.add(String.format("topics/%s/partition=%d/%s+%d+%010d.%s", str, Integer.valueOf(i), str, Integer.valueOf(i), Integer.valueOf(i5), str2));
            i4 = i5 + i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getExpectedTombstoneFilenames(String str, int i, int i2, long j, String str2, String str3) {
        int i3 = ((int) j) / i2;
        ArrayList arrayList = new ArrayList();
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i5 >= i3 * i2) {
                return arrayList;
            }
            arrayList.add(String.format("topics/%s/%s/%s+%d+%010d.keys.%s", str, str3, str, Integer.valueOf(i), Integer.valueOf(i5), str2));
            i4 = i5 + i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertFileNamesValid(String str, List<String> list) {
        Assertions.assertThat(getBucketFileNames(str)).containsExactlyInAnyOrderElementsOf(list);
    }

    private List<String> getBucketFileNames(String str) {
        ListObjectsV2Result listObjectsV2;
        ArrayList arrayList = new ArrayList();
        ListObjectsV2Request withBucketName = new ListObjectsV2Request().withBucketName(str);
        do {
            listObjectsV2 = S3Client.listObjectsV2(withBucketName);
            Iterator it = listObjectsV2.getObjectSummaries().iterator();
            while (it.hasNext()) {
                arrayList.add(((S3ObjectSummary) it.next()).getKey());
            }
            withBucketName.setContinuationToken(listObjectsV2.getNextContinuationToken());
        } while (listObjectsV2.isTruncated());
        return arrayList;
    }

    protected void startSchemaRegistry() throws Exception {
        this.restApp = new RestApp(findAvailableOpenPort().intValue(), (String) null, this.connect.kafka().bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, true, new Properties());
        this.restApp.start();
        waitForSchemaRegistryToStart();
    }

    protected void stopSchemaRegistry() throws Exception {
        if (this.restApp != null) {
            this.restApp.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForSchemaRegistryToStart() throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return this.restApp.restServer.isRunning();
        }, CONNECTOR_STARTUP_DURATION_MS, "Schema-registry server did not start in time.");
    }

    private Integer findAvailableOpenPort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            Integer valueOf = Integer.valueOf(serverSocket.getLocalPort());
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
            return valueOf;
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Schema getSampleStructSchema() {
        return SchemaBuilder.struct().field("ID", Schema.INT64_SCHEMA).field("myBool", Schema.BOOLEAN_SCHEMA).field("myInt32", Schema.INT32_SCHEMA).field("myFloat32", Schema.FLOAT32_SCHEMA).field("myFloat64", Schema.FLOAT64_SCHEMA).field("myString", Schema.STRING_SCHEMA).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Struct getSampleStructVal(Schema schema) {
        new Date(1111111L).setTime(0L);
        return new Struct(schema).put("ID", 1L).put("myBool", true).put("myInt32", 32).put("myFloat32", Float.valueOf(3.2f)).put("myFloat64", Double.valueOf(64.64d)).put("myString", "theStringVal");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkRecord getSampleTopicRecord(String str, Schema schema, Struct struct) {
        return new SinkRecord(str, 0, Schema.STRING_SCHEMA, "key", schema, struct, 0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkRecord getSampleRecord(Schema schema, Struct struct, String str) {
        return getSampleTopicRecord(str, schema, struct);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Iterable<Header> sampleHeaders() {
        return Arrays.asList(new RecordHeader("first-header-key", "first-header-value".getBytes()), new RecordHeader("second-header-key", "second-header-value".getBytes()));
    }

    protected static AmazonS3 getS3Client() {
        Map<String, String> aWSCredentialFromPath = getAWSCredentialFromPath();
        if (aWSCredentialFromPath.size() != 2) {
            return (AmazonS3) AmazonS3ClientBuilder.standard().withRegion(AWS_REGION).build();
        }
        return (AmazonS3) AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(aWSCredentialFromPath.get("aws.access.key.id"), aWSCredentialFromPath.get("aws.secret.access.key")))).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void clearBucket(String str) {
        Iterator it = S3Client.listObjectsV2(str).getObjectSummaries().iterator();
        while (it.hasNext()) {
            S3Client.deleteObject(str, ((S3ObjectSummary) it.next()).getKey());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean fileContentsAsExpected(String str, int i, Struct struct) {
        log.info("expectedRow: {}", struct);
        for (String str2 : getS3FileListValues(S3Client.listObjectsV2(str).getObjectSummaries())) {
            String str3 = TEST_DOWNLOAD_PATH + str2;
            File file = new File(str3);
            log.info("Saving file to : {}", str3);
            S3Client.getObject(new GetObjectRequest(str, str2), file);
            if (!fileContentsMatchExpected(contentGetters.get(getExtensionFromKey(str2)).apply(str3), i, struct)) {
                return false;
            }
            file.delete();
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean keyfileContentsAsExpected(String str, int i, String str2) {
        log.info("expectedKey: {}", str2);
        for (String str3 : getS3KeyFileList(S3Client.listObjectsV2(str).getObjectSummaries())) {
            String str4 = TEST_DOWNLOAD_PATH + str3;
            File file = new File(str4);
            log.info("Saving file to : {}", str4);
            S3Client.getObject(new GetObjectRequest(str, str3), file);
            ArrayList<String> arrayList = new ArrayList();
            try {
                FileReader fileReader = new FileReader(str4);
                Throwable th = null;
                try {
                    try {
                        BufferedReader bufferedReader = new BufferedReader(fileReader);
                        Throwable th2 = null;
                        while (true) {
                            try {
                                try {
                                    String readLine = bufferedReader.readLine();
                                    if (readLine == null) {
                                        break;
                                    }
                                    arrayList.add(readLine);
                                } finally {
                                }
                            } finally {
                            }
                        }
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                        if (fileReader != null) {
                            if (0 != 0) {
                                try {
                                    fileReader.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                fileReader.close();
                            }
                        }
                        if (arrayList.size() != i) {
                            log.error("Actual number of records in the key file {}, Expected number of records {}", Integer.valueOf(arrayList.size()), Integer.valueOf(i));
                            return false;
                        }
                        for (String str5 : arrayList) {
                            if (!str2.equals(str5)) {
                                log.error("Key {} did not match the contents in the key file {}", str2, str5);
                                return false;
                            }
                            log.info("Key {} matched the contents in the key file {}", str2, str5);
                        }
                        file.delete();
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return true;
    }

    protected boolean fileContentsMatchExpected(List<JsonNode> list, int i, Struct struct) {
        if (list.size() != i) {
            log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}", Integer.valueOf(list.size()), Integer.valueOf(i));
            return false;
        }
        Iterator<JsonNode> it = list.iterator();
        while (it.hasNext()) {
            if (!fileRowMatchesExpectedRow(it.next(), struct)) {
                return false;
            }
        }
        return true;
    }

    private List<String> getS3KeyFileList(List<S3ObjectSummary> list) {
        return (List) list.stream().filter(s3ObjectSummary -> {
            return s3ObjectSummary.getKey().contains(".keys.");
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private boolean fileRowMatchesExpectedRow(JsonNode jsonNode, Struct struct) {
        log.debug("Comparing rows: file: {}, expected: {}", jsonNode, struct);
        for (Field field : struct.schema().fields()) {
            String obj = struct.get(field).toString();
            String replaceAll = jsonNode.get(field.name()).toString().replaceAll("^\"|\"$", "");
            log.debug("Comparing values: {}, {}", obj, replaceAll);
            if (!replaceAll.equals(obj)) {
                return false;
            }
        }
        return true;
    }

    private boolean filenameContainsExtensions(String str, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (str.endsWith(it.next())) {
                return true;
            }
        }
        return false;
    }

    private List<String> getS3FileListValues(List<S3ObjectSummary> list) {
        List asList = Arrays.asList(".headers.avro", ".keys.avro");
        return (List) list.stream().filter(s3ObjectSummary -> {
            return !filenameContainsExtensions(s3ObjectSummary.getKey(), asList);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private static List<JsonNode> getContentsFromAvro(String str) {
        try {
            DataFileReader dataFileReader = new DataFileReader(new File(str), new GenericDatumReader());
            ArrayList arrayList = new ArrayList();
            while (dataFileReader.hasNext()) {
                arrayList.add(jsonMapper.readTree(((GenericRecord) dataFileReader.next()).toString()));
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<JsonNode> getContentsFromParquet(String str) {
        try {
            ParquetReader build = ParquetReader.builder(new SimpleReadSupport(), new Path(str)).build();
            Throwable th = null;
            try {
                JsonRecordFormatter.JsonGroupFormatter fromSchema = JsonRecordFormatter.fromSchema(ParquetFileReader.readFooter(new Configuration(), new Path(str)).getFileMetaData().getSchema());
                ArrayList arrayList = new ArrayList();
                for (SimpleRecord simpleRecord = (SimpleRecord) build.read(); simpleRecord != null; simpleRecord = (SimpleRecord) build.read()) {
                    arrayList.add(jsonMapper.readTree(fromSchema.formatRecord(simpleRecord)));
                }
                return arrayList;
            } finally {
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<JsonNode> getContentsFromJson(String str) {
        try {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(str));
            ArrayList arrayList = new ArrayList();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    return arrayList;
                }
                arrayList.add(jsonMapper.readTree(readLine));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static String getExtensionFromKey(String str) {
        String[] split = str.split("/");
        String str2 = split[split.length - 1];
        if (str2.endsWith(".snappy.parquet")) {
            return PARQUET_EXTENSION;
        }
        int lastIndexOf = str2.lastIndexOf(46);
        if (lastIndexOf < 0) {
            throw new RuntimeException("Could not parse extension from filename: " + str);
        }
        return str2.substring(lastIndexOf + 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Map<String, String> getAWSCredentialFromPath() {
        HashMap hashMap = new HashMap();
        if (!System.getenv().containsKey(AWS_CREDENTIALS_PATH)) {
            return hashMap;
        }
        try {
            Map map = (Map) new ObjectMapper().readValue(new FileReader(System.getenv().get(AWS_CREDENTIALS_PATH)), Map.class);
            String str = (String) map.get("aws_access_key_id");
            if (str != null && !str.isEmpty()) {
                hashMap.put("aws.access.key.id", str);
            }
            String str2 = (String) map.get("aws_secret_access_key");
            if (str2 != null && !str2.isEmpty()) {
                hashMap.put("aws.secret.access.key", str2);
            }
            return hashMap;
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalArgumentException("AWS credentials file not found.AWS_CREDENTIALS_PATH");
        }
    }
}
