package org.apache.eventmesh.connector.s3.source.connector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.eventmesh.connector.s3.source.config.S3SourceConfig;
import org.apache.eventmesh.connector.s3.source.config.SourceConnectorConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

/* loaded from: input_file:org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.class */
public class S3SourceConnector implements Source {
    private static final Logger log = LoggerFactory.getLogger(S3SourceConnector.class);
    public static final String REGION = "region";
    public static final String BUCKET = "bucket";
    public static final String FILE_NAME = "fileName";
    public static final String POSITION = "position";
    private S3SourceConfig sourceConfig;
    private SourceConnectorConfig sourceConnectorConfig;
    private int eachRecordSize;
    private long fileSize;
    private S3AsyncClient s3Client;
    private long position;

    public Class<? extends Config> configClass() {
        return S3SourceConfig.class;
    }

    public void init(Config config) throws Exception {
        this.sourceConfig = (S3SourceConfig) config;
        doInit();
    }

    public void init(ConnectorContext connectorContext) throws Exception {
        this.sourceConfig = (S3SourceConfig) ((SourceConnectorContext) connectorContext).getSourceConfig();
        doInit();
    }

    private void doInit() {
        this.sourceConnectorConfig = this.sourceConfig.getSourceConnectorConfig();
        this.eachRecordSize = calculateEachRecordSize();
        AwsBasicCredentials create = AwsBasicCredentials.create(this.sourceConnectorConfig.getAccessKey(), this.sourceConnectorConfig.getSecretKey());
        this.s3Client = (S3AsyncClient) S3AsyncClient.builder().credentialsProvider(() -> {
            return create;
        }).region(Region.of(this.sourceConnectorConfig.getRegion())).build();
    }

    private int calculateEachRecordSize() {
        return this.sourceConnectorConfig.getSchema().values().stream().reduce((num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }).orElse(0).intValue();
    }

    public void start() throws Exception {
        CompletableFuture headObject = this.s3Client.headObject(builder -> {
            builder.bucket(this.sourceConnectorConfig.getBucket()).key(this.sourceConnectorConfig.getFileName());
        });
        headObject.get(this.sourceConnectorConfig.getTimeout(), TimeUnit.MILLISECONDS);
        this.fileSize = ((HeadObjectResponse) headObject.get()).contentLength().longValue();
    }

    public void commit(ConnectRecord connectRecord) {
    }

    public String name() {
        return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
    }

    public void stop() throws Exception {
    }

    public List<ConnectRecord> poll() {
        if (this.position >= this.fileSize) {
            return Collections.EMPTY_LIST;
        }
        long j = this.position;
        long min = Math.min(this.fileSize, this.position + (this.eachRecordSize * this.sourceConnectorConfig.getBatchSize().intValue())) - 1;
        try {
            byte[] asByteArray = ((ResponseBytes) this.s3Client.getObject((GetObjectRequest) GetObjectRequest.builder().bucket(this.sourceConnectorConfig.getBucket()).key(this.sourceConnectorConfig.getFileName()).range("bytes=" + j + "-" + min).build(), AsyncResponseTransformer.toBytes()).get(this.sourceConnectorConfig.getTimeout(), TimeUnit.MILLISECONDS)).asByteArray();
            ArrayList arrayList = new ArrayList(asByteArray.length / this.eachRecordSize);
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 >= asByteArray.length) {
                    return arrayList;
                }
                byte[] bArr = new byte[this.eachRecordSize];
                System.arraycopy(asByteArray, i2, bArr, 0, this.eachRecordSize);
                this.position += this.eachRecordSize;
                arrayList.add(new ConnectRecord(getRecordPartition(), getRecordOffset(), Long.valueOf(System.currentTimeMillis()), bArr));
                i = i2 + this.eachRecordSize;
            }
        } catch (Exception e) {
            log.error("poll records from S3 file, poll range {}-{}, but failed", new Object[]{Long.valueOf(j), Long.valueOf(min), e});
            return Collections.EMPTY_LIST;
        }
    }

    private RecordPartition getRecordPartition() {
        HashMap hashMap = new HashMap();
        hashMap.put(REGION, this.sourceConnectorConfig.getRegion());
        hashMap.put(BUCKET, this.sourceConnectorConfig.getBucket());
        hashMap.put(FILE_NAME, this.sourceConnectorConfig.getFileName());
        return new RecordPartition(hashMap);
    }

    private RecordOffset getRecordOffset() {
        HashMap hashMap = new HashMap();
        hashMap.put(POSITION, String.valueOf(this.position));
        return new RecordOffset(hashMap);
    }
}
