package com.github.lontime.base.commonj.batch;

import com.github.lontime.base.commonj.utils.CollectionHelper;
import com.github.lontime.base.commonj.utils.LoggerHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/github/lontime/base/commonj/batch/FluxBatchRunner.class */
public class FluxBatchRunner<T> {
    private static final int DEFAULT_BATCH_SIZE = 20;
    private final Integer batchSize;
    private final Integer futureMaxSize;
    private final Consumer<Collection<T>> consumer;
    private Scheduler scheduler;
    private Queue<T> records;
    private Queue<CompletableFuture<Boolean>> futureQueues;
    private LongAdder counter;
    private Object lock;
    private volatile long lastActiveTime;

    public static <R> FluxBatchRunner<R> create(Consumer<Collection<R>> consumer) {
        return new FluxBatchRunner<>(consumer);
    }

    public static <R> FluxBatchRunner<R> create(Integer num, Consumer<Collection<R>> consumer) {
        return new FluxBatchRunner<>(num, consumer);
    }

    public FluxBatchRunner(Consumer<Collection<T>> consumer) {
        this(Integer.valueOf(DEFAULT_BATCH_SIZE), consumer);
    }

    public FluxBatchRunner(Integer num, Consumer<Collection<T>> consumer) {
        this.counter = new LongAdder();
        this.lock = new Object();
        Objects.requireNonNull(consumer, "consumer is not null");
        this.batchSize = num;
        this.futureMaxSize = Integer.valueOf(num.intValue() * DEFAULT_BATCH_SIZE);
        this.consumer = consumer;
        this.scheduler = Schedulers.parallel();
        this.records = new ConcurrentLinkedQueue();
        this.futureQueues = new ConcurrentLinkedQueue();
        this.lastActiveTime = System.currentTimeMillis();
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public List<CompletableFuture<Boolean>> getFutures() {
        return (List) this.futureQueues.stream().collect(Collectors.toList());
    }

    public void add(T t) {
        this.records.add(t);
        if (reachCount()) {
            subscribeToMono(toUnmodifiableList(this.records, this.batchSize.intValue()));
        }
        long sum = this.counter.sum();
        if (sum <= 0 || sum % this.futureMaxSize.intValue() != 0) {
            return;
        }
        pollFutures();
    }

    private boolean reachCount() {
        boolean z;
        synchronized (this.lock) {
            this.counter.increment();
            this.lastActiveTime = System.currentTimeMillis();
            long sum = this.counter.sum();
            z = sum > 0 && sum % ((long) this.batchSize.intValue()) == 0;
        }
        return z;
    }

    private void pollFutures() {
        while (true) {
            CompletableFuture<Boolean> poll = this.futureQueues.poll();
            if (poll == null) {
                return;
            } else {
                poll.join();
            }
        }
    }

    private void pollFutures(long j, TimeUnit timeUnit) {
        while (true) {
            CompletableFuture<Boolean> poll = this.futureQueues.poll();
            if (poll == null) {
                return;
            }
            try {
                poll.get(j, timeUnit);
            } catch (Exception e) {
                LoggerHelper.warnv((Throwable) e, "awaitAll error", new Object[0]);
            }
        }
    }

    public void awaitAll() {
        subscribeLast();
        pollFutures();
        synchronized (this.lock) {
            this.counter.reset();
        }
    }

    public void awaitAll(long j, TimeUnit timeUnit) {
        subscribeLast();
        pollFutures(j, timeUnit);
        synchronized (this.lock) {
            this.counter.reset();
        }
    }

    private void subscribeLast() {
        subscribeToMono(toUnmodifiableList(this.records));
    }

    private List<T> toUnmodifiableList(Queue<T> queue) {
        return toUnmodifiableList(queue, -1);
    }

    private List<T> toUnmodifiableList(Queue<T> queue, int i) {
        T poll;
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (true) {
            if ((i < 0 || i2 < i) && (poll = queue.poll()) != null) {
                arrayList.add(poll);
                i2++;
            }
        }
        return Collections.unmodifiableList(arrayList);
    }

    private void subscribeToMono(Collection<T> collection) {
        if (CollectionHelper.isEmpty((Collection<?>) collection)) {
            return;
        }
        this.futureQueues.add(Mono.just(collection).subscribeOn(this.scheduler).map(collection2 -> {
            try {
                this.consumer.accept(collection2);
                return Boolean.TRUE;
            } catch (Exception e) {
                LoggerHelper.warnv((Throwable) e, "subscribeToMono error", new Object[0]);
                return Boolean.FALSE;
            }
        }).toFuture());
    }
}
