package org.apache.streams.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/elasticsearch/ElasticsearchPersistReader.class */
public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable {
    public static final String STREAMS_ID = "ElasticsearchPersistReader";
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
    protected volatile Queue<StreamsDatum> persistQueue;
    private ElasticsearchQuery elasticsearchQuery;
    private ElasticsearchReaderConfiguration config;
    private ExecutorService executor;
    private Future<?> readerTask;
    private int threadPoolSize = 10;
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    /* loaded from: input_file:org/apache/streams/elasticsearch/ElasticsearchPersistReader$ElasticsearchPersistReaderTask.class */
    public static class ElasticsearchPersistReaderTask implements Runnable {
        private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
        private ElasticsearchPersistReader reader;
        private ElasticsearchQuery query;
        private ObjectMapper mapper = StreamsJacksonMapper.getInstance();

        public ElasticsearchPersistReaderTask(ElasticsearchPersistReader elasticsearchPersistReader, ElasticsearchQuery elasticsearchQuery) {
            this.reader = elasticsearchPersistReader;
            this.query = elasticsearchQuery;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.query.hasNext()) {
                SearchHit next = this.query.next();
                try {
                    StreamsDatum streamsDatum = new StreamsDatum((ObjectNode) this.mapper.readValue(next.getSourceAsString(), ObjectNode.class), next.getId());
                    streamsDatum.getMetadata().put("id", next.getId());
                    streamsDatum.getMetadata().put("index", next.getIndex());
                    streamsDatum.getMetadata().put("type", next.getType());
                    if (next.fields().containsKey("_timestamp")) {
                        streamsDatum.setTimestamp(new DateTime(((Long) next.field("_timestamp").getValue()).longValue()));
                    }
                    if (next.fields().containsKey("_parent")) {
                        streamsDatum.getMetadata().put("parent", ((SearchHitField) next.fields().get("_parent")).value());
                    }
                    this.reader.write(streamsDatum);
                } catch (IOException e) {
                    LOGGER.warn("Unable to process json source: ", next.getSourceAsString());
                }
            }
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e2) {
                LOGGER.warn("Thread interrupted", e2);
            }
        }
    }

    public ElasticsearchPersistReader() {
    }

    public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchReaderConfiguration) {
        this.config = elasticsearchReaderConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void startStream() {
        LOGGER.debug("startStream");
        this.executor = Executors.newSingleThreadExecutor();
        this.readerTask = this.executor.submit(new ElasticsearchPersistReaderTask(this, this.elasticsearchQuery));
    }

    public void prepare(Object obj) {
        this.elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(this.config);
        this.elasticsearchQuery.execute(obj);
        this.persistQueue = constructQueue();
    }

    public StreamsResultSet readAll() {
        return readCurrent();
    }

    public StreamsResultSet readCurrent() {
        try {
            this.lock.writeLock().lock();
            StreamsResultSet streamsResultSet = new StreamsResultSet(this.persistQueue);
            streamsResultSet.setCounter(new DatumStatusCounter());
            this.persistQueue = constructQueue();
            return streamsResultSet;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return readCurrent();
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return readCurrent();
    }

    public boolean isRunning() {
        return this.persistQueue.size() > 0 || !(this.readerTask.isDone() || this.readerTask.isCancelled());
    }

    public void cleanUp() {
        shutdownAndAwaitTermination(this.executor);
        LOGGER.info("PersistReader done");
        if (this.elasticsearchQuery != null) {
            this.elasticsearchQuery.cleanUp();
        }
    }

    protected void write(StreamsDatum streamsDatum) {
        boolean offer;
        do {
            try {
                this.lock.readLock().lock();
                offer = this.persistQueue.offer(streamsDatum);
                Thread.yield();
            } finally {
                this.lock.readLock().unlock();
            }
        } while (!offer);
    }

    protected void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    LOGGER.error("Pool did not terminate");
                }
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private Queue<StreamsDatum> constructQueue() {
        return Queues.synchronizedQueue(new LinkedBlockingQueue(10000));
    }
}
