package com.github.phantomthief.collection.impl;

import com.github.phantomthief.collection.BufferTrigger;
import com.github.phantomthief.util.MoreLocks;
import com.github.phantomthief.util.ThrowableConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/phantomthief/collection/impl/BatchConsumeBlockingQueueTrigger.class */
public class BatchConsumeBlockingQueueTrigger<E> implements BufferTrigger<E> {
    private static final Logger logger = LoggerFactory.getLogger(BatchConsumeBlockingQueueTrigger.class);
    private final BlockingQueue<E> queue;
    private final int batchSize;
    private final long lingerMs;
    private final ThrowableConsumer<List<E>, Exception> consumer;
    private final BiConsumer<Throwable, List<E>> exceptionHandler;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ReentrantLock lock = new ReentrantLock();
    private final AtomicBoolean running = new AtomicBoolean();

    /* loaded from: input_file:com/github/phantomthief/collection/impl/BatchConsumeBlockingQueueTrigger$BatchConsumerRunnable.class */
    private class BatchConsumerRunnable implements Runnable {
        private BatchConsumerRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BatchConsumeBlockingQueueTrigger.this.doBatchConsumer(TriggerType.LINGER);
            } finally {
                BatchConsumeBlockingQueueTrigger.this.scheduledExecutorService.schedule(this, BatchConsumeBlockingQueueTrigger.this.lingerMs, TimeUnit.MILLISECONDS);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/phantomthief/collection/impl/BatchConsumeBlockingQueueTrigger$TriggerType.class */
    public enum TriggerType {
        LINGER,
        ENQUEUE,
        MANUALLY
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchConsumeBlockingQueueTrigger(long j, int i, int i2, BiConsumer<Throwable, List<E>> biConsumer, ThrowableConsumer<List<E>, Exception> throwableConsumer, ScheduledExecutorService scheduledExecutorService) {
        this.lingerMs = j;
        this.batchSize = i;
        this.queue = new LinkedBlockingQueue(Integer.max(i2, i));
        this.consumer = throwableConsumer;
        this.exceptionHandler = biConsumer;
        this.scheduledExecutorService = scheduledExecutorService;
        this.scheduledExecutorService.schedule(new BatchConsumerRunnable(), this.lingerMs, TimeUnit.MILLISECONDS);
    }

    @Deprecated
    public static BatchConsumerTriggerBuilder<Object> newBuilder() {
        return new BatchConsumerTriggerBuilder<>();
    }

    @Override // com.github.phantomthief.collection.BufferTrigger
    public void enqueue(E e) {
        try {
            this.queue.put(e);
            tryTrigBatchConsume();
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
    }

    private void tryTrigBatchConsume() {
        if (this.queue.size() >= this.batchSize) {
            MoreLocks.runWithTryLock(this.lock, () -> {
                if (this.queue.size() < this.batchSize || this.running.get()) {
                    return;
                }
                this.scheduledExecutorService.execute(() -> {
                    doBatchConsumer(TriggerType.ENQUEUE);
                });
                this.running.set(true);
            });
        }
    }

    @Override // com.github.phantomthief.collection.BufferTrigger
    public void manuallyDoTrigger() {
        doBatchConsumer(TriggerType.MANUALLY);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doBatchConsumer(TriggerType triggerType) {
        MoreLocks.runWithLock(this.lock, () -> {
            try {
                this.running.set(true);
                while (!this.queue.isEmpty()) {
                    if (triggerType == TriggerType.ENQUEUE && this.queue.size() < this.batchSize) {
                        return;
                    }
                    ArrayList arrayList = new ArrayList(Math.min(this.batchSize, this.queue.size()));
                    this.queue.drainTo(arrayList, this.batchSize);
                    if (!arrayList.isEmpty()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("do batch consumer:{}, size:{}", triggerType, Integer.valueOf(arrayList.size()));
                        }
                        doConsume(arrayList);
                    }
                }
            } finally {
                this.running.set(false);
            }
        });
    }

    private void doConsume(List<E> list) {
        try {
            this.consumer.accept(list);
        } catch (Throwable th) {
            if (this.exceptionHandler == null) {
                logger.error("Ops.", th);
                return;
            }
            try {
                this.exceptionHandler.accept(th, list);
            } catch (Throwable th2) {
                logger.error("", th2);
            }
        }
    }

    @Override // com.github.phantomthief.collection.BufferTrigger
    public long getPendingChanges() {
        return this.queue.size();
    }
}
