/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.streams.converter.LineReadWriteConfiguration;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.s3.S3PersistReaderTask;
import org.apache.streams.s3.S3ReaderConfiguration;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3PersistReader
implements StreamsPersistReader,
DatumStatusCountable {
    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
    public static final String STREAMS_ID = "S3PersistReader";
    protected static final char DELIMITER = '\t';
    private S3ReaderConfiguration s3ReaderConfiguration;
    private AmazonS3Client amazonS3Client;
    private ObjectMapper mapper = new ObjectMapper();
    protected LineReadWriteUtil lineReaderUtil;
    private Collection<String> files;
    private ExecutorService executor;
    protected volatile Queue<StreamsDatum> persistQueue;
    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
    private Future<?> task;

    public AmazonS3Client getAmazonS3Client() {
        return this.amazonS3Client;
    }

    public S3ReaderConfiguration getS3ReaderConfiguration() {
        return this.s3ReaderConfiguration;
    }

    public String getBucketName() {
        return this.s3ReaderConfiguration.getBucket();
    }

    public StreamsResultSet readNew(BigInteger sequence) {
        return null;
    }

    public StreamsResultSet readRange(DateTime start, DateTime end) {
        return null;
    }

    public boolean isRunning() {
        return !this.task.isDone() && !this.task.isCancelled();
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }

    public Collection<String> getFiles() {
        return this.files;
    }

    public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
        this.s3ReaderConfiguration = s3ReaderConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void prepare(Object configurationObject) {
        boolean hasObjectSummaries;
        this.lineReaderUtil = LineReadWriteUtil.getInstance((LineReadWriteConfiguration)this.s3ReaderConfiguration);
        S3PersistReader s3PersistReader = this;
        synchronized (s3PersistReader) {
            BasicAWSCredentials credentials = new BasicAWSCredentials(this.s3ReaderConfiguration.getKey(), this.s3ReaderConfiguration.getSecretKey());
            ClientConfiguration clientConfig = new ClientConfiguration();
            clientConfig.setProtocol(Protocol.valueOf((String)this.s3ReaderConfiguration.getProtocol().toString()));
            S3ClientOptions clientOptions = new S3ClientOptions();
            clientOptions.setPathStyleAccess(false);
            this.amazonS3Client = new AmazonS3Client((AWSCredentials)credentials, clientConfig);
            if (!Strings.isNullOrEmpty((String)this.s3ReaderConfiguration.getRegion())) {
                this.amazonS3Client.setRegion(Region.getRegion((Regions)Regions.fromName((String)this.s3ReaderConfiguration.getRegion())));
            }
            this.amazonS3Client.setS3ClientOptions(clientOptions);
        }
        ListObjectsRequest request = new ListObjectsRequest().withBucketName(this.s3ReaderConfiguration.getBucket()).withPrefix(this.s3ReaderConfiguration.getReaderPath()).withMaxKeys(Integer.valueOf(500));
        ObjectListing listing = this.amazonS3Client.listObjects(request);
        this.files = new ArrayList<String>();
        boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0;
        boolean bl = hasObjectSummaries = listing.getObjectSummaries().size() > 0;
        if (hasCommonPrefixes || hasObjectSummaries) {
            do {
                if (hasCommonPrefixes) {
                    for (String file : listing.getCommonPrefixes()) {
                        this.files.add(file);
                    }
                } else {
                    for (S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
                        this.files.add(objectSummary.getKey());
                    }
                }
            } while ((listing = this.amazonS3Client.listNextBatchOfObjects(listing)).isTruncated());
        } else {
            this.files.add(this.s3ReaderConfiguration.getReaderPath());
        }
        if (this.files.size() <= 0) {
            LOGGER.error("There are no files to read");
        }
        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(10000));
        this.executor = Executors.newSingleThreadExecutor();
    }

    public void cleanUp() {
    }

    public StreamsResultSet readAll() {
        this.startStream();
        return new StreamsResultSet(this.persistQueue);
    }

    public void startStream() {
        LOGGER.debug("startStream");
        this.task = this.executor.submit(new S3PersistReaderTask(this));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamsResultSet readCurrent() {
        Class<S3PersistReader> clazz = S3PersistReader.class;
        synchronized (S3PersistReader.class) {
            StreamsResultSet current = new StreamsResultSet((Queue)Queues.newConcurrentLinkedQueue(this.persistQueue));
            current.setCounter(new DatumStatusCounter());
            current.getCounter().add(this.countersCurrent);
            this.countersTotal.add(this.countersCurrent);
            this.countersCurrent = new DatumStatusCounter();
            this.persistQueue.clear();
            // ** MonitorExit[var2_1] (shouldn't be in output)
            return current;
        }
    }
}

