package org.apache.apex.malhar.lib.fs.s3;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.PartETag;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.fs.s3.S3BlockUploadOperator;
import org.apache.apex.malhar.lib.fs.s3.S3InitiateFileUploadOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3FileMerger.class */
public class S3FileMerger implements Operator, Operator.CheckpointNotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(S3FileMerger.class);

    @NotNull
    private String bucketName;

    @NotNull
    private String accessKey;

    @NotNull
    private String secretAccessKey;
    private String endPoint;
    protected transient long currentWindowId;
    protected transient AmazonS3 s3Client;
    protected transient List<String> uploadedFiles = new ArrayList();
    private WindowDataManager windowDataManager = new FSWindowDataManager();

    @FieldSerializer.Bind(JavaSerializer.class)
    private Map<String, List<PartETag>> uploadParts = new HashMap();
    private Map<String, S3InitiateFileUploadOperator.UploadFileMetadata> fileMetadatas = new HashMap();
    public final transient DefaultInputPort<S3BlockUploadOperator.UploadBlockMetadata> uploadMetadataInput = new DefaultInputPort<S3BlockUploadOperator.UploadBlockMetadata>() { // from class: org.apache.apex.malhar.lib.fs.s3.S3FileMerger.1
        public void process(S3BlockUploadOperator.UploadBlockMetadata uploadBlockMetadata) {
            S3FileMerger.this.processUploadBlock(uploadBlockMetadata);
        }
    };
    public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> filesMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>() { // from class: org.apache.apex.malhar.lib.fs.s3.S3FileMerger.2
        public void process(S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata) {
            S3FileMerger.this.processFileMetadata(uploadFileMetadata);
        }
    };

    protected void processUploadBlock(S3BlockUploadOperator.UploadBlockMetadata uploadBlockMetadata) {
        List<PartETag> list = this.uploadParts.get(uploadBlockMetadata.getKeyName());
        if (list == null) {
            list = new ArrayList();
            this.uploadParts.put(uploadBlockMetadata.getKeyName(), list);
        }
        list.add(uploadBlockMetadata.getPartETag());
        if (this.fileMetadatas.get(uploadBlockMetadata.getKeyName()) != null) {
            verifyAndEmitFileMerge(uploadBlockMetadata.getKeyName());
        }
    }

    protected void processFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata) {
        String keyName = uploadFileMetadata.getKeyName();
        this.fileMetadatas.put(keyName, uploadFileMetadata);
        if (this.uploadParts.get(keyName) != null) {
            verifyAndEmitFileMerge(keyName);
        }
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.windowDataManager.setup(operatorContext);
        this.s3Client = createClient();
    }

    protected AmazonS3 createClient() {
        AmazonS3Client amazonS3Client = new AmazonS3Client(new BasicAWSCredentials(this.accessKey, this.secretAccessKey));
        if (this.endPoint != null) {
            amazonS3Client.setEndpoint(this.endPoint);
        }
        return amazonS3Client;
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
    }

    public void endWindow() {
        if (this.uploadedFiles.size() > 0) {
            for (String str : this.uploadedFiles) {
                this.uploadParts.remove(str);
                this.fileMetadatas.remove(str);
            }
            this.uploadedFiles.clear();
        }
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            try {
                this.windowDataManager.save("Uploaded Files", this.currentWindowId);
            } catch (IOException e) {
                throw new RuntimeException("Unable to save recovery", e);
            }
        }
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    private void verifyAndEmitFileMerge(String str) {
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata = this.fileMetadatas.get(str);
        List<PartETag> list = this.uploadParts.get(str);
        if (list == null || uploadFileMetadata == null || uploadFileMetadata.getFileMetadata().getNumberOfBlocks() != list.size()) {
            return;
        }
        if (list.size() <= 1) {
            this.uploadedFiles.add(str);
            LOG.debug("Uploaded file {} successfully", str);
            return;
        }
        if (this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(this.bucketName, str, uploadFileMetadata.getUploadId(), list)).getETag() != null) {
            this.uploadedFiles.add(str);
            LOG.debug("Uploaded file {} successfully", str);
        }
    }

    public void beforeCheckpoint(long j) {
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.windowDataManager.committed(j);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public String getBucketName() {
        return this.bucketName;
    }

    public void setBucketName(@NotNull String str) {
        this.bucketName = (String) Preconditions.checkNotNull(str);
    }

    public String getAccessKey() {
        return this.accessKey;
    }

    public void setAccessKey(@NotNull String str) {
        this.accessKey = (String) Preconditions.checkNotNull(str);
    }

    public String getSecretAccessKey() {
        return this.secretAccessKey;
    }

    public void setSecretAccessKey(@NotNull String str) {
        this.secretAccessKey = (String) Preconditions.checkNotNull(str);
    }

    public String getEndPoint() {
        return this.endPoint;
    }

    public void setEndPoint(String str) {
        this.endPoint = str;
    }
}
