/*
 * Decompiled with CFR 0.152.
 */
package io.fluxzero.common;

import io.fluxzero.common.ErrorHandler;
import io.fluxzero.common.Monitored;
import io.fluxzero.common.ObjectUtils;
import io.fluxzero.common.Registration;
import io.fluxzero.common.ThrowingConsumer;
import io.fluxzero.common.ThrowingFunction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Backlog<T>
implements Monitored<List<T>> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(Backlog.class);
    private final int maxBatchSize;
    private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
    private final ThrowingFunction<List<T>, CompletableFuture<?>> consumer;
    private final ErrorHandler<List<T>> errorHandler;
    private final ExecutorService executorService;
    private final AtomicBoolean flushing = new AtomicBoolean();
    private final AtomicLong insertPosition = new AtomicLong();
    private final AtomicLong flushPosition = new AtomicLong();
    private final ConcurrentSkipListMap<Long, CompletableFuture<Void>> results = new ConcurrentSkipListMap();
    private final Collection<Consumer<List<T>>> monitors = new CopyOnWriteArraySet<Consumer<List<T>>>();

    public static <T> Backlog<T> forConsumer(ThrowingConsumer<List<T>> consumer) {
        return Backlog.forConsumer(consumer, 1024);
    }

    public static <T> Backlog<T> forConsumer(ThrowingConsumer<List<T>> consumer, int maxBatchSize) {
        return Backlog.forConsumer(consumer, maxBatchSize, (e, batch) -> log.error("Consumer {} failed to handle batch of size {}. Continuing with next batch.", new Object[]{consumer, batch.size(), e}));
    }

    public static <T> Backlog<T> forConsumer(ThrowingConsumer<List<T>> consumer, int maxBatchSize, ErrorHandler<List<T>> errorHandler) {
        return new Backlog<T>(list -> {
            consumer.accept((List)list);
            return null;
        }, maxBatchSize, errorHandler);
    }

    public static <T> Backlog<T> forAsyncConsumer(ThrowingFunction<List<T>, CompletableFuture<?>> consumer) {
        return Backlog.forAsyncConsumer(consumer, 1024);
    }

    public static <T> Backlog<T> forAsyncConsumer(ThrowingFunction<List<T>, CompletableFuture<?>> consumer, int maxBatchSize) {
        return Backlog.forAsyncConsumer(consumer, maxBatchSize, (e, batch) -> log.error("Consumer {} failed to handle batch of size {}. Continuing with next batch.", new Object[]{consumer, batch.size(), e}));
    }

    public static <T> Backlog<T> forAsyncConsumer(ThrowingFunction<List<T>, CompletableFuture<?>> consumer, int maxBatchSize, ErrorHandler<List<T>> errorHandler) {
        return new Backlog<T>(consumer, maxBatchSize, errorHandler);
    }

    protected Backlog(ThrowingFunction<List<T>, CompletableFuture<?>> consumer) {
        this(consumer, 1024);
    }

    protected Backlog(ThrowingFunction<List<T>, CompletableFuture<?>> consumer, int maxBatchSize) {
        this(consumer, maxBatchSize, (e, batch) -> log.error("Consumer {} failed to handle batch {}. Continuing with next batch.", new Object[]{consumer, batch, e}));
    }

    protected Backlog(ThrowingFunction<List<T>, CompletableFuture<?>> consumer, int maxBatchSize, ErrorHandler<List<T>> errorHandler) {
        this.maxBatchSize = maxBatchSize;
        this.consumer = consumer;
        this.executorService = Executors.newSingleThreadExecutor(ObjectUtils.newThreadFactory("Backlog"));
        this.errorHandler = errorHandler;
    }

    @SafeVarargs
    public final CompletableFuture<Void> add(T ... values) {
        Collections.addAll(this.queue, values);
        return values.length == 0 ? CompletableFuture.completedFuture(null) : this.awaitFlush(this.insertPosition.updateAndGet(p -> p + (long)values.length));
    }

    public CompletableFuture<Void> add(Collection<? extends T> values) {
        this.queue.addAll(values);
        return values.isEmpty() ? CompletableFuture.completedFuture(null) : this.awaitFlush(this.insertPosition.updateAndGet(p -> p + (long)values.size()));
    }

    private CompletableFuture<Void> awaitFlush(long untilPosition) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.results.put(untilPosition, result);
        this.flushIfNotFlushing();
        return result;
    }

    private void flushIfNotFlushing() {
        if (this.flushing.compareAndSet(false, true)) {
            this.executorService.execute(this::flush);
        }
    }

    private void flush() {
        try {
            while (!this.queue.isEmpty()) {
                CompletableFuture<Object> future;
                T value;
                ArrayList batch = new ArrayList(this.maxBatchSize);
                while (batch.size() < this.maxBatchSize && (value = this.queue.poll()) != null) {
                    batch.add(value);
                }
                try {
                    future = this.consumer.apply(batch);
                }
                catch (Throwable e2) {
                    future = CompletableFuture.failedFuture(e2);
                    this.errorHandler.handleError(e2, batch);
                }
                long lastPosition = this.flushPosition.addAndGet(batch.size());
                if (future == null) {
                    this.completeResults(lastPosition, null);
                } else {
                    future.whenComplete((r, e) -> this.completeResults(lastPosition, (Throwable)e));
                }
                this.monitors.forEach(m -> m.accept(batch));
            }
            this.flushing.set(false);
            if (!this.queue.isEmpty()) {
                this.flushIfNotFlushing();
            }
        }
        catch (Throwable e3) {
            log.error("Failed to flush the backlog", e3);
            this.flushing.set(false);
            throw e3;
        }
    }

    protected void completeResults(long untilPosition, Throwable e) {
        NavigableMap futures = this.results.headMap((Object)untilPosition, true);
        futures.forEach((k, v) -> {
            if (e == null) {
                v.complete(null);
            } else {
                v.completeExceptionally(e);
            }
        });
        futures.clear();
    }

    @Override
    public Registration registerMonitor(Consumer<List<T>> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }

    public void shutDown() {
        try {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.warn("Shutdown of executor was interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        catch (Throwable e) {
            log.warn("Failed to shutdown a backlog", e);
        }
    }

    @FunctionalInterface
    public static interface BatchConsumer<T> {
        public CompletableFuture<Void> accept(List<T> var1) throws Exception;
    }
}

