package org.apache.streams.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/s3/S3PersistReader.class */
public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
    public static final String STREAMS_ID = "S3PersistReader";
    protected static final char DELIMITER = '\t';
    private S3ReaderConfiguration s3ReaderConfiguration;
    private AmazonS3Client amazonS3Client;
    protected LineReadWriteUtil lineReaderUtil;
    private Collection<String> files;
    private ExecutorService executor;
    protected volatile Queue<StreamsDatum> persistQueue;
    private Future<?> task;
    private ObjectMapper mapper = new ObjectMapper();
    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();

    public AmazonS3Client getAmazonS3Client() {
        return this.amazonS3Client;
    }

    public S3ReaderConfiguration getS3ReaderConfiguration() {
        return this.s3ReaderConfiguration;
    }

    public String getBucketName() {
        return this.s3ReaderConfiguration.getBucket();
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return (this.task.isDone() || this.task.isCancelled()) ? false : true;
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }

    public Collection<String> getFiles() {
        return this.files;
    }

    public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
        this.s3ReaderConfiguration = s3ReaderConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void prepare(Object obj) {
        this.lineReaderUtil = LineReadWriteUtil.getInstance(this.s3ReaderConfiguration);
        synchronized (this) {
            BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(this.s3ReaderConfiguration.getKey(), this.s3ReaderConfiguration.getSecretKey());
            ClientConfiguration clientConfiguration = new ClientConfiguration();
            clientConfiguration.setProtocol(Protocol.valueOf(this.s3ReaderConfiguration.getProtocol().toString()));
            S3ClientOptions s3ClientOptions = new S3ClientOptions();
            s3ClientOptions.setPathStyleAccess(false);
            this.amazonS3Client = new AmazonS3Client(basicAWSCredentials, clientConfiguration);
            if (!Strings.isNullOrEmpty(this.s3ReaderConfiguration.getRegion())) {
                this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(this.s3ReaderConfiguration.getRegion())));
            }
            this.amazonS3Client.setS3ClientOptions(s3ClientOptions);
        }
        ObjectListing listObjects = this.amazonS3Client.listObjects(new ListObjectsRequest().withBucketName(this.s3ReaderConfiguration.getBucket()).withPrefix(this.s3ReaderConfiguration.getReaderPath()).withMaxKeys(500));
        this.files = new ArrayList();
        boolean z = listObjects.getCommonPrefixes().size() > 0;
        boolean z2 = listObjects.getObjectSummaries().size() > 0;
        if (z || z2) {
            do {
                if (z) {
                    Iterator it = listObjects.getCommonPrefixes().iterator();
                    while (it.hasNext()) {
                        this.files.add((String) it.next());
                    }
                } else {
                    Iterator it2 = listObjects.getObjectSummaries().iterator();
                    while (it2.hasNext()) {
                        this.files.add(((S3ObjectSummary) it2.next()).getKey());
                    }
                }
                listObjects = this.amazonS3Client.listNextBatchOfObjects(listObjects);
            } while (listObjects.isTruncated());
        } else {
            this.files.add(this.s3ReaderConfiguration.getReaderPath());
        }
        if (this.files.size() <= 0) {
            LOGGER.error("There are no files to read");
        }
        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(10000));
        this.executor = Executors.newSingleThreadExecutor();
    }

    public void cleanUp() {
    }

    public StreamsResultSet readAll() {
        startStream();
        return new StreamsResultSet(this.persistQueue);
    }

    public void startStream() {
        LOGGER.debug("startStream");
        this.task = this.executor.submit(new S3PersistReaderTask(this));
    }

    public StreamsResultSet readCurrent() {
        StreamsResultSet streamsResultSet;
        synchronized (S3PersistReader.class) {
            streamsResultSet = new StreamsResultSet(Queues.newConcurrentLinkedQueue(this.persistQueue));
            streamsResultSet.setCounter(new DatumStatusCounter());
            streamsResultSet.getCounter().add(this.countersCurrent);
            this.countersTotal.add(this.countersCurrent);
            this.countersCurrent = new DatumStatusCounter();
            this.persistQueue.clear();
        }
        return streamsResultSet;
    }
}
