/*
 * Decompiled with CFR 0.152.
 */
package cn.gongler.util;

import cn.gongler.util.GonglerUtil;
import cn.gongler.util.ITask;
import cn.gongler.util.MinuteTimer;
import cn.gongler.util.function.ExceptionConsumer;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.function.Consumer;

public class QueueConsumer
implements Consumer<ITask>,
AutoCloseable {
    private static final long serialVersionUID = 4800910141074860566L;
    private final String threadName;
    private int threadNum;
    private final BlockingQueue<ITask> taskQueue;
    private final List<Thread> backThreads = new ArrayList<Thread>();
    private boolean cancel = false;
    private long taskCount = 0L;

    public static QueueConsumer of(String threadName) {
        return QueueConsumer.of(threadName, 100000);
    }

    @Deprecated
    public static QueueConsumer of(String threadName, BlockingQueue<ITask> queue) {
        return new QueueConsumer(threadName, queue);
    }

    public static QueueConsumer of(String threadName, int queueCapacity) {
        return QueueConsumer.of(threadName, new ArrayBlockingQueue<ITask>(queueCapacity));
    }

    public static QueueConsumer of(String threadName, int queueCapacity, int threadCount) {
        return new QueueConsumer(threadName, queueCapacity, threadCount, 10);
    }

    private QueueConsumer(String threadName, BlockingQueue<ITask> queue) {
        this.threadName = threadName;
        this.taskQueue = queue;
        Thread backThread = GonglerUtil.StartDaemonThread(threadName, () -> {
            try {
                while (!this.cancel) {
                    this.taskQueue.take().executeWithCatchAny();
                }
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                System.out.println("Exception: Thread:" + threadName + " is exited.");
            }
        });
        this.backThreads.add(backThread);
    }

    private QueueConsumer(String threadName, int queueCapacity, int threadNum, int itemCountMax) {
        this.threadName = threadName;
        LinkedTransferQueue<ITask> queue = new LinkedTransferQueue<ITask>();
        MinuteTimer.of().add(() -> {
            if (queue.size() > queueCapacity) {
                System.out.println("WARN:" + LocalDateTime.now() + ", " + threadName + " clearQueue:" + queue.size());
                queue.clear();
            }
        });
        this.taskQueue = queue;
        this.threadNum = threadNum;
        for (int i = 0; i < threadNum; ++i) {
            String name = threadName + i;
            Thread backThread = GonglerUtil.StartDaemonThread(name, () -> {
                try {
                    ArrayList tasks = new ArrayList();
                    while (!this.cancel) {
                        int count = queue.drainTo(tasks, itemCountMax);
                        if (count == 0) {
                            this.taskQueue.take().executeWithCatchAny();
                            continue;
                        }
                        for (ITask task : tasks) {
                            task.executeWithCatchAny();
                        }
                        tasks.clear();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
                finally {
                    System.out.println("Exception: Thread:" + name + " is exited.");
                }
            });
            this.backThreads.add(backThread);
        }
    }

    @Override
    public void accept(ITask task) {
        ++this.taskCount;
        this.taskQueue.offer(task);
    }

    public long acceptTaskCount() {
        return this.taskCount;
    }

    public int size() {
        return this.taskQueue.size();
    }

    public <D> Consumer<D> toView(ExceptionConsumer<D> dataHandler) {
        return data -> this.accept(() -> dataHandler.accept(data));
    }

    public Collection<ITask> getQueue() {
        return Collections.unmodifiableCollection(this.taskQueue);
    }

    @Override
    public void close() {
        this.cancel = true;
        this.backThreads.stream().forEach(Thread::interrupt);
    }

    public String toString() {
        return this.threadName + "_Thread:" + this.threadNum + "_queueSize:" + this.size();
    }
}

