package org.apache.rya.mongodb.batch;

import com.mongodb.DuplicateKeyException;
import com.mongodb.MongoBulkWriteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.rya.mongodb.batch.collection.CollectionType;
import org.apache.rya.shaded.com.google.common.base.Preconditions;
import org.apache.rya.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;

/* loaded from: input_file:org/apache/rya/mongodb/batch/MongoDbBatchWriter.class */
public class MongoDbBatchWriter<T> {
    private static final int CHECK_QUEUE_INTERVAL_MS = 10;
    private final CollectionType<T> collectionType;
    private final long batchFlushTimeMs;
    private final ArrayBlockingQueue<T> statementInsertionQueue;
    private ScheduledFuture<?> flushBatchFuture;
    private Thread queueFullCheckerThread;
    private static final Logger log = Logger.getLogger(MongoDbBatchWriter.class);
    private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("Queue Full Checker Thread - %d").build();
    private final ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(0);
    private final AtomicBoolean isInit = new AtomicBoolean();
    private final Runnable flushBatchTask = new BatchFlusher();

    /* loaded from: input_file:org/apache/rya/mongodb/batch/MongoDbBatchWriter$BatchFlusher.class */
    private class BatchFlusher implements Runnable {
        private BatchFlusher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!MongoDbBatchWriter.this.statementInsertionQueue.isEmpty()) {
                    MongoDbBatchWriter.log.trace("Running statement insertion flush task. Too much time has passed without any object insertions so all queued data is being flushed.");
                    MongoDbBatchWriter.this.flush();
                }
            } catch (Exception e) {
                MongoDbBatchWriter.log.error("Error flush out the statements", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/rya/mongodb/batch/MongoDbBatchWriter$QueueFullChecker.class */
    private class QueueFullChecker implements Runnable {
        private QueueFullChecker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (MongoDbBatchWriter.this.isInit.get()) {
                try {
                    if (MongoDbBatchWriter.this.statementInsertionQueue.remainingCapacity() == 0) {
                        MongoDbBatchWriter.log.trace("Statement queue is FULL -> going to empty it");
                        try {
                            MongoDbBatchWriter.this.flush();
                        } catch (MongoDbBatchWriterException e) {
                            MongoDbBatchWriter.log.error("Error emptying queue", e);
                        }
                    }
                    Thread.sleep(10L);
                } catch (InterruptedException e2) {
                    MongoDbBatchWriter.log.error("Encountered an unexpected error while checking the batch queue.", e2);
                    return;
                }
            }
        }
    }

    public MongoDbBatchWriter(CollectionType<T> collectionType, MongoDbBatchWriterConfig mongoDbBatchWriterConfig) {
        this.collectionType = (CollectionType) Preconditions.checkNotNull(collectionType);
        this.batchFlushTimeMs = ((MongoDbBatchWriterConfig) Preconditions.checkNotNull(mongoDbBatchWriterConfig)).getBatchFlushTimeMs();
        this.statementInsertionQueue = new ArrayBlockingQueue<>(mongoDbBatchWriterConfig.getBatchSize());
    }

    public void start() throws MongoDbBatchWriterException {
        if (this.isInit.get()) {
            return;
        }
        if (this.flushBatchFuture == null) {
            this.flushBatchFuture = startFlushTimer();
        }
        if (this.queueFullCheckerThread == null) {
            this.queueFullCheckerThread = QUEUE_THREAD_FACTORY.newThread(new QueueFullChecker());
        }
        this.isInit.set(true);
        this.queueFullCheckerThread.start();
    }

    public void shutdown() throws MongoDbBatchWriterException {
        this.isInit.set(false);
        if (this.flushBatchFuture != null) {
            this.flushBatchFuture.cancel(true);
            this.flushBatchFuture = null;
        }
        if (this.queueFullCheckerThread == null || !this.queueFullCheckerThread.isAlive()) {
            return;
        }
        try {
            this.queueFullCheckerThread.join(20L);
        } catch (InterruptedException e) {
            log.error("Error waiting for thread to finish", e);
        }
        this.queueFullCheckerThread = null;
    }

    public void addObjectToQueue(T t) throws MongoDbBatchWriterException {
        if (t != null) {
            try {
                resetFlushTimer();
                this.statementInsertionQueue.put(t);
            } catch (Exception e) {
                throw new MongoDbBatchWriterException("Error adding object to batch queue.", e);
            }
        }
    }

    public void addObjectsToQueue(List<T> list) throws MongoDbBatchWriterException {
        if (list != null) {
            Iterator<T> it = list.iterator();
            while (it.hasNext()) {
                addObjectToQueue(it.next());
            }
        }
    }

    public void flush() throws MongoDbBatchWriterException {
        ArrayList arrayList = new ArrayList();
        try {
            this.statementInsertionQueue.drainTo(arrayList);
            if (!arrayList.isEmpty()) {
                this.collectionType.insertMany(arrayList);
            }
        } catch (DuplicateKeyException e) {
            log.warn(e);
        } catch (MongoBulkWriteException e2) {
            if (!e2.getMessage().contains("duplicate key error")) {
                throw new MongoDbBatchWriterException("Error flushing statements", e2);
            }
            log.warn(e2);
        } catch (Exception e3) {
            throw new MongoDbBatchWriterException("Error flushing statements", e3);
        }
    }

    private void resetFlushTimer() throws MongoDbBatchWriterException {
        this.flushBatchFuture.cancel(false);
        this.flushBatchFuture = startFlushTimer();
    }

    private ScheduledFuture<?> startFlushTimer() throws MongoDbBatchWriterException {
        try {
            return this.scheduledExecutor.schedule(this.flushBatchTask, this.batchFlushTimeMs, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new MongoDbBatchWriterException("Error starting batch flusher", e);
        }
    }
}
