/*
 * Decompiled with CFR 0.152.
 */
package com.github.lontime.base.commonj.batch;

import com.github.lontime.base.commonj.utils.CollectionHelper;
import com.github.lontime.base.commonj.utils.LoggerHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class FluxBatchRunner<T> {
    private static final int DEFAULT_BATCH_SIZE = 20;
    private final Integer batchSize;
    private final Integer futureMaxSize;
    private final Consumer<Collection<T>> consumer;
    private Scheduler scheduler;
    private Queue<T> records;
    private Queue<CompletableFuture<Boolean>> futureQueues;
    private LongAdder counter = new LongAdder();
    private Object lock = new Object();
    private volatile long lastActiveTime;

    public static <R> FluxBatchRunner<R> create(Consumer<Collection<R>> consumer) {
        return new FluxBatchRunner(consumer);
    }

    public static <R> FluxBatchRunner<R> create(Integer batchSize, Consumer<Collection<R>> consumer) {
        return new FluxBatchRunner(batchSize, consumer);
    }

    public FluxBatchRunner(Consumer<Collection<T>> consumer) {
        this(20, consumer);
    }

    public FluxBatchRunner(Integer batchSize, Consumer<Collection<T>> consumer) {
        Objects.requireNonNull(consumer, "consumer is not null");
        this.batchSize = batchSize;
        this.futureMaxSize = batchSize * 20;
        this.consumer = consumer;
        this.scheduler = Schedulers.parallel();
        this.records = new ConcurrentLinkedQueue<T>();
        this.futureQueues = new ConcurrentLinkedQueue<CompletableFuture<Boolean>>();
        this.lastActiveTime = System.currentTimeMillis();
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public List<CompletableFuture<Boolean>> getFutures() {
        return this.futureQueues.stream().collect(Collectors.toList());
    }

    public void add(T record) {
        long sum;
        this.records.add(record);
        if (this.reachCount()) {
            this.subscribeToMono(this.toUnmodifiableList(this.records, this.batchSize));
        }
        if ((sum = this.counter.sum()) > 0L && sum % (long)this.futureMaxSize.intValue() == 0L) {
            this.pollFutures();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean reachCount() {
        Object object = this.lock;
        synchronized (object) {
            this.counter.increment();
            this.lastActiveTime = System.currentTimeMillis();
            long sum = this.counter.sum();
            boolean result = sum > 0L && sum % (long)this.batchSize.intValue() == 0L;
            return result;
        }
    }

    private void pollFutures() {
        CompletableFuture<Boolean> future;
        while ((future = this.futureQueues.poll()) != null) {
            future.join();
        }
    }

    private void pollFutures(long timeout, TimeUnit unit) {
        CompletableFuture<Boolean> future;
        while ((future = this.futureQueues.poll()) != null) {
            try {
                future.get(timeout, unit);
            }
            catch (Exception e) {
                LoggerHelper.warnv((Throwable)e, "awaitAll error", new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void awaitAll() {
        this.subscribeLast();
        this.pollFutures();
        Object object = this.lock;
        synchronized (object) {
            this.counter.reset();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void awaitAll(long timeout, TimeUnit unit) {
        this.subscribeLast();
        this.pollFutures(timeout, unit);
        Object object = this.lock;
        synchronized (object) {
            this.counter.reset();
        }
    }

    private void subscribeLast() {
        this.subscribeToMono(this.toUnmodifiableList(this.records));
    }

    private List<T> toUnmodifiableList(Queue<T> partRecords) {
        return this.toUnmodifiableList(partRecords, -1);
    }

    private List<T> toUnmodifiableList(Queue<T> partRecords, int size) {
        T record;
        ArrayList<T> list = new ArrayList<T>();
        for (int i = 0; (size < 0 || i < size) && (record = partRecords.poll()) != null; ++i) {
            list.add(record);
        }
        return Collections.unmodifiableList(list);
    }

    private void subscribeToMono(Collection<T> partRecords) {
        if (CollectionHelper.isEmpty(partRecords)) {
            return;
        }
        this.futureQueues.add(Mono.just(partRecords).subscribeOn(this.scheduler).map(s -> {
            try {
                this.consumer.accept((Collection<Collection>)s);
                return Boolean.TRUE;
            }
            catch (Exception ex) {
                LoggerHelper.warnv((Throwable)ex, "subscribeToMono error", new Object[0]);
                return Boolean.FALSE;
            }
        }).toFuture());
    }
}

