package org.apache.apex.malhar.lib.fs.s3;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractReconciler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3Reconciler.class */
public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.OutputMetaData, FSRecordCompactionOperator.OutputMetaData> {

    @NotNull
    private String accessKey;

    @NotNull
    private String secretKey;

    @NotNull
    private String bucketName;
    private String region;

    @NotNull
    private String directoryName;
    protected transient AmazonS3 s3client;
    protected transient FileSystem fs;
    protected transient String filePath;
    private static final String TMP_EXTENSION = ".tmp";
    public final transient DefaultOutputPort<FSRecordCompactionOperator.OutputMetaData> outputPort = new DefaultOutputPort<>();
    private static final Logger logger = LoggerFactory.getLogger(S3Reconciler.class);

    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void setup(Context.OperatorContext operatorContext) {
        this.s3client = new AmazonS3Client(new BasicAWSCredentials(this.accessKey, this.secretKey));
        if (this.region != null) {
            this.s3client.setRegion(Region.getRegion(Regions.fromName(this.region)));
        }
        this.filePath = (String) operatorContext.getValue(DAG.APPLICATION_PATH);
        try {
            this.fs = FileSystem.newInstance(new Path(this.filePath).toUri(), new Configuration());
        } catch (IOException e) {
            logger.error("Unable to create FileSystem: {}", e.getMessage());
        }
        super.setup(operatorContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void processTuple(FSRecordCompactionOperator.OutputMetaData outputMetaData) {
        logger.debug("enque : {}", outputMetaData);
        enqueueForProcessing(outputMetaData);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void processCommittedData(FSRecordCompactionOperator.OutputMetaData outputMetaData) {
        try {
            Path path = new Path(outputMetaData.getPath());
            if (!this.fs.exists(path)) {
                logger.debug("Ignoring non-existent path assuming replay : {}", path);
                return;
            }
            FSDataInputStream open = this.fs.open(path);
            ObjectMetadata objectMetadata = new ObjectMetadata();
            objectMetadata.setContentLength(outputMetaData.getSize());
            String str = this.directoryName + "/" + outputMetaData.getFileName();
            PutObjectRequest putObjectRequest = new PutObjectRequest(this.bucketName, str, open, objectMetadata);
            if (outputMetaData.getSize() >= 2147483647L) {
                throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE");
            }
            putObjectRequest.getRequestClientOptions().setReadLimit((int) outputMetaData.getSize());
            if (this.fs.exists(path)) {
                logger.debug("File {} Uploaded at {}", str, this.s3client.putObject(putObjectRequest).getETag());
            }
        } catch (FileNotFoundException e) {
            logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath());
        } catch (IOException e2) {
            logger.error("Unable to create Stream: {}", e2.getMessage());
        }
    }

    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void endWindow() {
        while (this.doneTuples.peek() != null) {
            removeIntermediateFiles((FSRecordCompactionOperator.OutputMetaData) this.doneTuples.poll());
        }
    }

    protected void removeIntermediateFiles(FSRecordCompactionOperator.OutputMetaData outputMetaData) {
        logger.debug("found metaData = {}", outputMetaData);
        this.committedTuples.remove(outputMetaData);
        try {
            for (FileStatus fileStatus : this.fs.listStatus(new Path(outputMetaData.getPath()).getParent())) {
                String name = fileStatus.getPath().getName();
                if (name.endsWith(TMP_EXTENSION) && name.startsWith(outputMetaData.getFileName())) {
                    String substring = name.substring(0, name.lastIndexOf(46, name.lastIndexOf(46) - 1));
                    logger.debug("actualFileName = {}", substring);
                    if (outputMetaData.getFileName().equals(substring)) {
                        logger.debug("deleting stray file {}", name);
                        this.fs.delete(fileStatus.getPath(), true);
                    }
                } else if (name.equals(outputMetaData.getFileName())) {
                    logger.info("deleting s3-compaction file {}", name);
                    this.fs.delete(fileStatus.getPath(), true);
                }
            }
        } catch (IOException e) {
            logger.error("Unable to Delete a file: {}", outputMetaData.getFileName());
        }
    }

    public String getAccessKey() {
        return this.accessKey;
    }

    public void setAccessKey(@NotNull String str) {
        this.accessKey = (String) Preconditions.checkNotNull(str);
    }

    public String getSecretKey() {
        return this.secretKey;
    }

    public void setSecretKey(@NotNull String str) {
        this.secretKey = (String) Preconditions.checkNotNull(str);
    }

    public String getBucketName() {
        return this.bucketName;
    }

    public void setBucketName(@NotNull String str) {
        this.bucketName = (String) Preconditions.checkNotNull(str);
    }

    public String getDirectoryName() {
        return this.directoryName;
    }

    public void setDirectoryName(@NotNull String str) {
        this.directoryName = (String) Preconditions.checkNotNull(str);
    }

    public String getRegion() {
        return this.region;
    }

    public void setRegion(String str) {
        this.region = str;
    }

    @VisibleForTesting
    void setS3client(AmazonS3 amazonS3) {
        this.s3client = amazonS3;
    }
}
