/*
 * Decompiled with CFR 0.152.
 */
package io.polaris.concurrent.pool;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.polaris.core.concurrent.pool.ConsumerDelegates;
import io.polaris.core.concurrent.pool.ResourceableConsumer;
import io.polaris.core.concurrent.pool.RunnableStatistics;
import io.polaris.core.concurrent.pool.RunnableStatisticsHolder;
import io.polaris.core.concurrent.pool.TransactionConsumer;
import io.polaris.core.log.Logger;
import io.polaris.core.log.Loggers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class DisruptorPooledExecutor<E>
implements RunnableStatisticsHolder {
    private static final Logger log = Loggers.of(DisruptorPooledExecutor.class);
    private int errorLimit = -1;
    private boolean openStatistics = false;
    private volatile boolean running = false;
    private RunnableStatistics statistics;
    private List<Consumer<E>> consumers = new ArrayList<Consumer<E>>();
    private String name;
    private int ringBufferSize = 4096;
    private Disruptor<PooledEvent> disruptor;

    public RunnableStatistics runnableStatistics() {
        return this.statistics;
    }

    public void setRingBufferSize(int ringBufferSize) {
        if (ringBufferSize > 0) {
            int size;
            for (size = 16; size < ringBufferSize; size <<= 1) {
            }
            this.ringBufferSize = size;
        }
    }

    private void addConsumerInner(Consumer<E> consumer) {
        this.consumers.add(consumer);
    }

    public void addConsumer(Consumer<E> consumer) {
        this.addConsumer(1, consumer);
    }

    public <Resource> void addConsumer(TransactionConsumer<E, Resource> consumer) {
        this.addConsumer(1, consumer);
    }

    public void addConsumer(int count, Consumer<E> consumer) {
        for (int k = 0; k < count; ++k) {
            Consumer delegate = ConsumerDelegates.createDelegate((RunnableStatisticsHolder)this, consumer);
            this.addConsumerInner(delegate);
        }
    }

    public <Resource> void addConsumer(int count, TransactionConsumer<E, Resource> consumer) {
        for (int k = 0; k < count; ++k) {
            ResourceableConsumer delegate = ConsumerDelegates.createDelegate((RunnableStatisticsHolder)this, consumer);
            this.addConsumerInner((Consumer<E>)delegate);
        }
    }

    public boolean isExceedErrorLimit() {
        return this.statistics != null && this.statistics.isExceedErrorLimit();
    }

    public void start() {
        if (this.running) {
            throw new IllegalStateException("\u6b63\u5728\u8fd0\u884c\u4e2d");
        }
        this.running = true;
        if (this.openStatistics || this.errorLimit >= 0) {
            this.statistics = new RunnableStatistics(this.errorLimit);
        }
        int size = this.consumers.size();
        WorkHandler[] handles = new PooledEventHandler[size];
        for (int i = 0; i < size; ++i) {
            Consumer<E> consumer = this.consumers.get(i);
            if (consumer instanceof ResourceableConsumer) {
                ((ResourceableConsumer)consumer).open();
            }
            handles[i] = new PooledEventHandler<E>(consumer);
        }
        PooledThreadFactory threadFactory = new PooledThreadFactory(this.name);
        Disruptor disruptor = new Disruptor(new PooledEventFactory(), this.ringBufferSize, (ThreadFactory)threadFactory, ProducerType.SINGLE, (WaitStrategy)new SleepingWaitStrategy());
        disruptor.handleEventsWithWorkerPool(handles);
        disruptor.start();
        this.disruptor = disruptor;
    }

    public void offer(Iterable<E> datas) {
        for (E data : datas) {
            this.offer(data);
        }
    }

    public void offer(E ... datas) {
        for (E data : datas) {
            this.offer(data);
        }
    }

    public void offer(E data) {
        if (!this.running) {
            throw new IllegalStateException("\u72b6\u6001\u5df2\u505c\u6b62");
        }
        if (this.isExceedErrorLimit()) {
            throw new IllegalStateException("\u5904\u7406\u5931\u8d25\u6570\u91cf\u8d85\u9650(" + this.getErrorLimit() + ")");
        }
        this.disruptor.publishEvent(this::translateTo, data);
    }

    public void shutdown() {
        try {
            this.running = false;
            log.info("shutdown...");
            this.disruptor.shutdown();
            int size = this.consumers.size();
            for (int i = 0; i < size; ++i) {
                Consumer<E> consumer = this.consumers.get(i);
                if (!(consumer instanceof ResourceableConsumer)) continue;
                ((ResourceableConsumer)consumer).close();
            }
            log.info("shutdown!");
        }
        catch (Exception e) {
            log.error("", (Throwable)e);
        }
    }

    void translateTo(PooledEvent event, long sequence, E task) {
        event.setData(task);
    }

    public void setErrorLimit(int errorLimit) {
        this.errorLimit = errorLimit;
    }

    public int getErrorLimit() {
        return this.errorLimit;
    }

    public void setOpenStatistics(boolean openStatistics) {
        this.openStatistics = openStatistics;
    }

    public boolean isOpenStatistics() {
        return this.openStatistics;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getName() {
        return this.name;
    }

    static class PooledThreadFactory
    implements ThreadFactory {
        private static AtomicLong pool = new AtomicLong(0L);
        private AtomicLong seq = new AtomicLong(0L);
        private long poolId = pool.incrementAndGet();
        private String name = "disruptor-pooled";

        public PooledThreadFactory(String name) {
            if (name != null && name.length() > 0) {
                this.name = name;
            }
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(this.name + (this.poolId > 0L ? "-" + this.poolId + "-" : "-") + this.seq.incrementAndGet());
            return t;
        }
    }

    static class PooledEventFactory<E>
    implements EventFactory<PooledEvent<E>> {
        PooledEventFactory() {
        }

        public PooledEvent<E> newInstance() {
            return new PooledEvent();
        }
    }

    static class PooledEventHandler<E>
    implements WorkHandler<PooledEvent<E>> {
        private Consumer<E> consumer;

        public PooledEventHandler(Consumer<E> consumer) {
            this.consumer = consumer;
        }

        public void onEvent(PooledEvent<E> event) throws Exception {
            this.consumer.accept(((PooledEvent)event).data);
        }
    }

    static class PooledEvent<E> {
        private E data;

        PooledEvent() {
        }

        public E getData() {
            return this.data;
        }

        public void setData(E data) {
            this.data = data;
        }
    }
}

