/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteConfiguration;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.s3.S3OutputStreamWrapper;
import org.apache.streams.s3.S3WriterConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3PersistWriter
implements StreamsPersistWriter,
DatumStatusCountable {
    public static final String STREAMS_ID = "S3PersistWriter";
    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class);
    private static final char DELIMITER = '\t';
    private ObjectMapper objectMapper;
    private AmazonS3Client amazonS3Client;
    private S3WriterConfiguration s3WriterConfiguration;
    private final List<String> writtenFiles = new ArrayList<String>();
    protected LineReadWriteUtil lineWriterUtil;
    private final AtomicLong totalBytesWritten = new AtomicLong();
    private AtomicLong bytesWrittenThisFile = new AtomicLong();
    private final AtomicInteger totalRecordsWritten = new AtomicInteger();
    private AtomicInteger fileLineCounter = new AtomicInteger();
    private static Map<String, String> objectMetaData = new HashMap<String, String>();
    private OutputStreamWriter currentWriter = null;

    public AmazonS3Client getAmazonS3Client() {
        return this.amazonS3Client;
    }

    public S3WriterConfiguration getS3WriterConfiguration() {
        return this.s3WriterConfiguration;
    }

    public List<String> getWrittenFiles() {
        return this.writtenFiles;
    }

    public Map<String, String> getObjectMetaData() {
        return objectMetaData;
    }

    public ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    public void setObjectMapper(ObjectMapper mapper) {
        this.objectMapper = mapper;
    }

    public void setObjectMetaData(Map<String, String> val) {
        objectMetaData = val;
    }

    public S3PersistWriter() {
        this((S3WriterConfiguration)new ComponentConfigurator(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
    }

    public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
        this.s3WriterConfiguration = s3WriterConfiguration;
    }

    public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) {
        this.amazonS3Client = amazonS3Client;
        this.s3WriterConfiguration = s3WriterConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(StreamsDatum streamsDatum) {
        S3PersistWriter s3PersistWriter = this;
        synchronized (s3PersistWriter) {
            if (this.currentWriter == null || this.bytesWrittenThisFile.get() >= this.s3WriterConfiguration.getMaxFileSize() * 1024L * 1024L) {
                try {
                    LOGGER.info("Resetting the file");
                    this.currentWriter = this.resetFile();
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            String line = this.lineWriterUtil.convertResultToString(streamsDatum);
            try {
                this.currentWriter.write(line);
            }
            catch (IOException ex) {
                ex.printStackTrace();
            }
            int recordSize = line.getBytes().length;
            this.totalBytesWritten.addAndGet(recordSize);
            this.bytesWrittenThisFile.addAndGet(recordSize);
            this.totalRecordsWritten.incrementAndGet();
            this.fileLineCounter.incrementAndGet();
        }
    }

    public synchronized OutputStreamWriter resetFile() throws Exception {
        if (this.fileLineCounter.get() == 0 && this.currentWriter != null) {
            return this.currentWriter;
        }
        this.closeAndDestroyWriter();
        try {
            String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + (this.s3WriterConfiguration.getChunk() != false ? "/" : "-") + new Date().getTime() + ".tsv";
            S3OutputStreamWrapper outputStream = new S3OutputStreamWrapper(this.amazonS3Client, this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath(), fileName, objectMetaData);
            this.fileLineCounter = new AtomicInteger();
            this.bytesWrittenThisFile = new AtomicLong();
            this.writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName);
            LOGGER.info("File Created: Bucket[{}] - {}", (Object)this.s3WriterConfiguration.getBucket(), (Object)(this.s3WriterConfiguration.getWriterPath() + fileName));
            return new OutputStreamWriter(outputStream);
        }
        catch (Exception ex) {
            LOGGER.error(ex.getMessage());
            throw ex;
        }
    }

    private synchronized void closeAndDestroyWriter() {
        if (this.currentWriter != null) {
            this.safeFlush(this.currentWriter);
            this.closeSafely(this.currentWriter);
            this.currentWriter = null;
            LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", new Object[]{this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size() - 1)});
        }
    }

    private synchronized void closeSafely(Writer writer) {
        if (writer != null) {
            try {
                writer.flush();
                writer.close();
            }
            catch (Exception ex) {
                LOGGER.trace("closeSafely", (Throwable)ex);
            }
            LOGGER.debug("File Closed");
        }
    }

    private void safeFlush(Flushable flushable) {
        if (flushable != null) {
            try {
                flushable.flush();
            }
            catch (IOException ex) {
                LOGGER.trace("safeFlush", (Throwable)ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void prepare(Object configurationObject) {
        this.lineWriterUtil = LineReadWriteUtil.getInstance((LineReadWriteConfiguration)this.s3WriterConfiguration);
        S3PersistWriter s3PersistWriter = this;
        synchronized (s3PersistWriter) {
            try {
                if (this.objectMapper == null) {
                    this.objectMapper = StreamsJacksonMapper.getInstance();
                }
                if (this.amazonS3Client == null) {
                    BasicAWSCredentials credentials = new BasicAWSCredentials(this.s3WriterConfiguration.getKey(), this.s3WriterConfiguration.getSecretKey());
                    ClientConfiguration clientConfig = new ClientConfiguration();
                    clientConfig.setProtocol(Protocol.valueOf((String)this.s3WriterConfiguration.getProtocol().toString()));
                    S3ClientOptions clientOptions = new S3ClientOptions();
                    clientOptions.setPathStyleAccess(false);
                    this.amazonS3Client = new AmazonS3Client((AWSCredentials)credentials, clientConfig);
                    if (StringUtils.isNotEmpty((CharSequence)this.s3WriterConfiguration.getRegion())) {
                        this.amazonS3Client.setRegion(Region.getRegion((Regions)Regions.fromName((String)this.s3WriterConfiguration.getRegion())));
                    }
                    this.amazonS3Client.setS3ClientOptions(clientOptions);
                }
            }
            catch (Exception ex) {
                LOGGER.error("Exception while preparing the S3 client: {}", (Throwable)ex);
            }
            Preconditions.checkArgument((this.amazonS3Client != null ? 1 : 0) != 0);
        }
    }

    public void cleanUp() {
        this.closeAndDestroyWriter();
    }

    public DatumStatusCounter getDatumStatusCounter() {
        DatumStatusCounter counters = new DatumStatusCounter();
        counters.incrementAttempt(this.totalRecordsWritten.get());
        counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get());
        return counters;
    }

    static {
        objectMetaData.put("line[0]", "id");
        objectMetaData.put("line[1]", "timeStamp");
        objectMetaData.put("line[2]", "metaData");
        objectMetaData.put("line[3]", "document");
    }
}

