package io.honeycomb.libhoney.transport.batch.impl;

import io.honeycomb.libhoney.transport.batch.BatchConsumer;
import io.honeycomb.libhoney.transport.batch.BatchKeyStrategy;
import io.honeycomb.libhoney.transport.batch.Batcher;
import io.honeycomb.libhoney.transport.batch.ClockProvider;
import io.honeycomb.libhoney.utils.Assert;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/honeycomb/libhoney/transport/batch/impl/DefaultBatcher.class */
public class DefaultBatcher<T, K> implements Batcher<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultBatcher.class);
    private static final long CLEANUP_THRESHOLD = 20;
    private static final long SHUTDOWN_TIMEOUT = 5000;
    private final int batchSize;
    private final long batchTimeoutNanos;
    private final BlockingQueue<T> pendingQueue;
    private final Map<K, DefaultBatcher<T, K>.Batch> batches;
    private final ExecutorService executor;
    private final BatchConsumer<T> batchConsumer;
    private final BatchKeyStrategy<T, K> batchKeyStrategy;
    private final ClockProvider clockProvider;
    private final CountDownLatch closingLatch;
    private volatile boolean running = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/honeycomb/libhoney/transport/batch/impl/DefaultBatcher$Batch.class */
    public class Batch {
        private final List<T> elements;
        private long triggerInstant;
        private long notUsedCounter;

        private Batch() {
            this.elements = new ArrayList(DefaultBatcher.this.batchSize);
            this.triggerInstant = calculateNextTriggerInstant();
        }

        private long calculateNextTriggerInstant() {
            return DefaultBatcher.this.clockProvider.getMonotonicTime() + DefaultBatcher.this.batchTimeoutNanos;
        }

        void add(T t) {
            this.elements.add(t);
        }

        void add(List<T> list) {
            this.elements.addAll(list);
        }

        boolean isFull() {
            return this.elements.size() >= DefaultBatcher.this.batchSize;
        }

        List<T> drainBatch() {
            ArrayList arrayList = new ArrayList(this.elements);
            this.elements.clear();
            this.triggerInstant = calculateNextTriggerInstant();
            return arrayList;
        }

        long getTriggerInstant() {
            return this.triggerInstant;
        }

        boolean isEmpty() {
            return this.elements.isEmpty();
        }

        void markNotUsed() {
            this.notUsedCounter++;
            this.triggerInstant = calculateNextTriggerInstant();
        }

        boolean hasReachedCleanupThreshold() {
            return this.notUsedCounter >= DefaultBatcher.CLEANUP_THRESHOLD;
        }

        boolean hasReachedTriggerInstant() {
            return this.triggerInstant - DefaultBatcher.this.clockProvider.getMonotonicTime() <= 0;
        }
    }

    /* loaded from: input_file:io/honeycomb/libhoney/transport/batch/impl/DefaultBatcher$BatchingWorker.class */
    private class BatchingWorker implements Runnable {
        private BatchingWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    Object poll = DefaultBatcher.this.pendingQueue.poll(DefaultBatcher.this.getLowestTimeout(), TimeUnit.NANOSECONDS);
                    if (poll != null) {
                        DefaultBatcher.this.handleNewEvent(poll);
                    }
                    DefaultBatcher.this.handleTimeoutTriggers();
                } catch (InterruptedException e) {
                    DefaultBatcher.LOG.debug("Batcher thread interrupted. Initiating flush prior to shutdown.");
                    Thread.currentThread().interrupt();
                }
            }
            DefaultBatcher.this.flush();
        }
    }

    public DefaultBatcher(BatchKeyStrategy<T, K> batchKeyStrategy, BatchConsumer<T> batchConsumer, ClockProvider clockProvider, BlockingQueue<T> blockingQueue, int i, long j) {
        Assert.isTrue(i > 0, "batchSize must be > 0");
        Assert.isTrue(j > 0, "batchTimeoutMillis must be > 0");
        Assert.notNull(batchKeyStrategy, "batchKeyStrategy must not be null");
        Assert.notNull(batchConsumer, "batchConsumer must not be null");
        Assert.notNull(clockProvider, "clockProvider must not be null");
        Assert.notNull(blockingQueue, "pendingQueue must not be null");
        this.batchTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(j);
        this.pendingQueue = blockingQueue;
        this.batchConsumer = batchConsumer;
        this.batchSize = i;
        this.batchKeyStrategy = batchKeyStrategy;
        this.clockProvider = clockProvider;
        this.batches = new HashMap();
        this.closingLatch = new CountDownLatch(1);
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(new BatchingWorker());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNewEvent(T t) throws InterruptedException {
        K key = this.batchKeyStrategy.getKey(t);
        DefaultBatcher<T, K>.Batch batch = this.batches.get(key);
        if (batch == null) {
            batch = new Batch();
            this.batches.put(key, batch);
        }
        batch.add((DefaultBatcher<T, K>.Batch) t);
        if (batch.isFull()) {
            submitBatch(batch);
        }
    }

    int getCurrentlyActiveBatches() {
        return this.batches.size();
    }

    @Override // io.honeycomb.libhoney.transport.batch.Batcher
    public boolean offerEvent(T t) {
        if (!this.running) {
            return false;
        }
        boolean offer = this.pendingQueue.offer(t);
        if (this.running) {
            return offer;
        }
        try {
            this.closingLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return !this.pendingQueue.contains(t);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            LOG.debug("Shutting down Batcher thread");
            this.running = false;
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOG.error("Interrupted during wait for batcher to terminate", e);
                Thread.currentThread().interrupt();
            }
            LOG.debug("Batcher thread shutdown complete");
        } finally {
            this.closingLatch.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void flush() {
        try {
            ArrayList arrayList = new ArrayList();
            this.pendingQueue.drainTo(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                handleNewEvent(it.next());
            }
            for (DefaultBatcher<T, K>.Batch batch : this.batches.values()) {
                if (!batch.isEmpty()) {
                    submitBatch(batch);
                }
            }
            this.batches.clear();
        } catch (InterruptedException e) {
            LOG.error("Interrupt thrown during flush. Exiting flush early.", e);
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLowestTimeout() {
        long j = Long.MAX_VALUE;
        for (DefaultBatcher<T, K>.Batch batch : this.batches.values()) {
            if (batch.getTriggerInstant() < j) {
                j = batch.getTriggerInstant();
            }
        }
        return j - this.clockProvider.getMonotonicTime();
    }

    private void submitBatch(DefaultBatcher<T, K>.Batch batch) throws InterruptedException {
        List<T> drainBatch = batch.drainBatch();
        try {
            this.batchConsumer.consume(drainBatch);
        } catch (InterruptedException e) {
            batch.add((List) drainBatch);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleTimeoutTriggers() throws InterruptedException {
        Iterator<Map.Entry<K, DefaultBatcher<T, K>.Batch>> it = this.batches.entrySet().iterator();
        while (it.hasNext()) {
            DefaultBatcher<T, K>.Batch value = it.next().getValue();
            if (value.hasReachedTriggerInstant()) {
                if (value.isEmpty()) {
                    value.markNotUsed();
                    if (value.hasReachedCleanupThreshold()) {
                        it.remove();
                    }
                } else {
                    submitBatch(value);
                }
            }
        }
    }
}
