package org.apache.apex.malhar.lib.fs.s3;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.fs.s3.S3InitiateFileUploadOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.class */
public class S3BlockUploadOperator implements Operator, Operator.CheckpointNotificationListener, Operator.IdleTimeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(S3BlockUploadOperator.class);

    @NotNull
    private String bucketName;

    @NotNull
    private String accessKey;

    @NotNull
    private String secretAccessKey;
    private String endPoint;
    protected transient AmazonS3 s3Client;
    private transient long currentWindowId;
    private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples;
    private transient Map<String, UploadBlockMetadata> currentWindowRecoveryState;
    private Map<String, S3BlockMetaData> blockInfo = new HashMap();
    private transient Map<Long, String> blockIdToFilePath = new HashMap();
    private WindowDataManager windowDataManager = new FSWindowDataManager();
    public final transient DefaultOutputPort<UploadBlockMetadata> output = new DefaultOutputPort<>();
    public final transient DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockInput = new DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>>() { // from class: org.apache.apex.malhar.lib.fs.s3.S3BlockUploadOperator.1
        public void process(AbstractBlockReader.ReaderRecord<Slice> readerRecord) {
            S3BlockUploadOperator.this.uploadBlockIntoS3(readerRecord);
        }
    };
    public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>() { // from class: org.apache.apex.malhar.lib.fs.s3.S3BlockUploadOperator.2
        public void process(BlockMetadata.FileBlockMetadata fileBlockMetadata) {
            if (S3BlockUploadOperator.this.currentWindowId <= S3BlockUploadOperator.this.windowDataManager.getLargestCompletedWindow()) {
                return;
            }
            S3BlockUploadOperator.this.blockIdToFilePath.put(Long.valueOf(fileBlockMetadata.getBlockId()), fileBlockMetadata.getFilePath());
            S3BlockUploadOperator.LOG.debug("received blockId {} for file {} ", Long.valueOf(fileBlockMetadata.getBlockId()), fileBlockMetadata.getFilePath());
        }
    };
    public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> uploadMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>() { // from class: org.apache.apex.malhar.lib.fs.s3.S3BlockUploadOperator.3
        public void process(S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata) {
            S3BlockUploadOperator.this.processUploadFileMetadata(uploadFileMetadata);
        }
    };

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator$S3BlockMetaData.class */
    public static class S3BlockMetaData {
        private String keyName;
        private String uploadId;
        private Integer partNo;
        private boolean isLastBlock;

        public S3BlockMetaData() {
        }

        public S3BlockMetaData(String str, String str2, Integer num) {
            this.keyName = str;
            this.uploadId = str2;
            this.partNo = num;
            this.isLastBlock = false;
        }

        public String getKeyName() {
            return this.keyName;
        }

        public String getUploadId() {
            return this.uploadId;
        }

        public Integer getPartNo() {
            return this.partNo;
        }

        public boolean isLastBlock() {
            return this.isLastBlock;
        }

        public void setLastBlock(boolean z) {
            this.isLastBlock = z;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator$UploadBlockMetadata.class */
    public static class UploadBlockMetadata {

        @FieldSerializer.Bind(JavaSerializer.class)
        private PartETag partETag;
        private String keyName;

        public UploadBlockMetadata() {
        }

        public UploadBlockMetadata(PartETag partETag, String str) {
            this.partETag = partETag;
            this.keyName = str;
        }

        public PartETag getPartETag() {
            return this.partETag;
        }

        public String getKeyName() {
            return this.keyName;
        }

        public int hashCode() {
            return this.keyName.hashCode();
        }
    }

    protected void processUploadFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata) {
        long[] blockIds = uploadFileMetadata.getFileMetadata().getBlockIds();
        String filePath = uploadFileMetadata.getFileMetadata().getFilePath();
        for (int i = 0; i < blockIds.length; i++) {
            String uniqueBlockIdFromFile = getUniqueBlockIdFromFile(blockIds[i], filePath);
            if (this.blockInfo.get(uniqueBlockIdFromFile) != null) {
                break;
            }
            this.blockInfo.put(uniqueBlockIdFromFile, new S3BlockMetaData(uploadFileMetadata.getKeyName(), uploadFileMetadata.getUploadId(), Integer.valueOf(i + 1)));
        }
        if (blockIds.length > 0) {
            this.blockInfo.get(getUniqueBlockIdFromFile(blockIds[blockIds.length - 1], filePath)).setLastBlock(true);
        }
    }

    public static String getUniqueBlockIdFromFile(long j, String str) {
        return Long.toString(j) + "|" + str;
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.waitingTuples = new ArrayList();
        this.currentWindowRecoveryState = new HashMap();
        this.windowDataManager.setup(operatorContext);
        this.s3Client = createClient();
    }

    protected AmazonS3 createClient() {
        AmazonS3Client amazonS3Client = new AmazonS3Client(new BasicAWSCredentials(this.accessKey, this.secretAccessKey));
        if (this.endPoint != null) {
            amazonS3Client.setEndpoint(this.endPoint);
        }
        return amazonS3Client;
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        if (j <= this.windowDataManager.getLargestCompletedWindow()) {
            replay(j);
        }
    }

    protected void replay(long j) {
        try {
            Map map = (Map) this.windowDataManager.retrieve(j);
            if (map == null) {
                return;
            }
            for (Map.Entry entry : map.entrySet()) {
                this.output.emit(entry.getValue());
                this.blockInfo.remove(entry.getKey());
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void endWindow() {
        if (this.waitingTuples.size() > 0) {
            processWaitBlocks();
        }
        for (String str : this.currentWindowRecoveryState.keySet()) {
            long parseLong = Long.parseLong(str.substring(0, str.indexOf("|")));
            LOG.debug("Successfully uploaded {} block", Long.valueOf(parseLong));
            this.blockIdToFilePath.remove(Long.valueOf(parseLong));
            this.blockInfo.remove(str);
        }
        if (this.blockIdToFilePath.size() > 0) {
            Iterator<Long> it = this.blockIdToFilePath.keySet().iterator();
            while (it.hasNext()) {
                LOG.info("Unable to uploaded {} block", it.next());
            }
            this.blockIdToFilePath.clear();
        }
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
            } catch (IOException e) {
                throw new RuntimeException("Unable to save recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    private void processWaitBlocks() {
        Iterator<AbstractBlockReader.ReaderRecord<Slice>> it = this.waitingTuples.iterator();
        while (it.hasNext()) {
            AbstractBlockReader.ReaderRecord<Slice> next = it.next();
            String str = this.blockIdToFilePath.get(Long.valueOf(next.getBlockId()));
            if (str != null && this.blockInfo.get(getUniqueBlockIdFromFile(next.getBlockId(), str)) != null) {
                uploadBlockIntoS3(next);
                it.remove();
            }
        }
    }

    protected void uploadBlockIntoS3(AbstractBlockReader.ReaderRecord<Slice> readerRecord) {
        PartETag partETag;
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        if (this.blockIdToFilePath.get(Long.valueOf(readerRecord.getBlockId())) == null) {
            if (this.waitingTuples.contains(readerRecord)) {
                return;
            }
            this.waitingTuples.add(readerRecord);
            return;
        }
        String uniqueBlockIdFromFile = getUniqueBlockIdFromFile(readerRecord.getBlockId(), this.blockIdToFilePath.get(Long.valueOf(readerRecord.getBlockId())));
        S3BlockMetaData s3BlockMetaData = this.blockInfo.get(uniqueBlockIdFromFile);
        if (s3BlockMetaData == null) {
            if (this.waitingTuples.contains(readerRecord)) {
                return;
            }
            this.waitingTuples.add(readerRecord);
            return;
        }
        long j = readerRecord.getRecord().length;
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(readerRecord.getRecord().buffer);
        if (s3BlockMetaData.isLastBlock && s3BlockMetaData.partNo.intValue() == 1) {
            ObjectMetadata createObjectMetadata = createObjectMetadata();
            createObjectMetadata.setContentLength(j);
            partETag = new PartETag(1, this.s3Client.putObject(new PutObjectRequest(this.bucketName, s3BlockMetaData.getKeyName(), byteArrayInputStream, createObjectMetadata)).getETag());
        } else {
            try {
                partETag = this.s3Client.uploadPart(new UploadPartRequest().withBucketName(this.bucketName).withKey(s3BlockMetaData.getKeyName()).withUploadId(s3BlockMetaData.getUploadId()).withPartNumber(s3BlockMetaData.getPartNo().intValue()).withInputStream(byteArrayInputStream).withPartSize(j)).getPartETag();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        UploadBlockMetadata uploadBlockMetadata = new UploadBlockMetadata(partETag, s3BlockMetaData.getKeyName());
        this.output.emit(uploadBlockMetadata);
        this.currentWindowRecoveryState.put(uniqueBlockIdFromFile, uploadBlockMetadata);
        try {
            byteArrayInputStream.close();
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    public ObjectMetadata createObjectMetadata() {
        return new ObjectMetadata();
    }

    public void beforeCheckpoint(long j) {
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.windowDataManager.committed(j);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void handleIdleTime() {
        if (this.waitingTuples.size() > 0) {
            processWaitBlocks();
        }
    }

    public String getBucketName() {
        return this.bucketName;
    }

    public void setBucketName(@NotNull String str) {
        this.bucketName = (String) Preconditions.checkNotNull(str);
    }

    public String getAccessKey() {
        return this.accessKey;
    }

    public void setAccessKey(@NotNull String str) {
        this.accessKey = (String) Preconditions.checkNotNull(str);
    }

    public String getSecretAccessKey() {
        return this.secretAccessKey;
    }

    public void setSecretAccessKey(@NotNull String str) {
        this.secretAccessKey = (String) Preconditions.checkNotNull(str);
    }

    public String getEndPoint() {
        return this.endPoint;
    }

    public void setEndPoint(String str) {
        this.endPoint = str;
    }
}
