/*
 * Decompiled with CFR 0.152.
 */
package cn.ymotel.dactor.core.disruptor;

import cn.ymotel.dactor.core.disruptor.MessageEvent;
import cn.ymotel.dactor.core.disruptor.MessageRingBufferDispatcher;
import cn.ymotel.dactor.core.disruptor.Sentinel;
import cn.ymotel.dactor.core.disruptor.WorkProcessorManager;
import cn.ymotel.dactor.message.Message;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class RingBufferManager
implements InitializingBean,
ApplicationContextAware {
    private MessageRingBufferDispatcher messageRingBufferDispatcher;
    private ApplicationContext appcontext = null;
    private int minsize = -1;
    private int maxsize = -1;
    RingBuffer<MessageEvent> ringBuffer;
    private int bufferSize = 1024;
    private WaitStrategy strategy = new BlockingWaitStrategy();
    private ConcurrentLinkedQueue<Message> quene = new ConcurrentLinkedQueue();
    private Semaphore semaphore = new Semaphore(0);
    private int checktime = 1000;
    private WorkProcessorManager workProcessorManager = null;
    private Sentinel sentinel = new Sentinel();
    private Disruptor disruptor = null;
    private ScheduledExecutorService scheduledExecutorService = null;
    ExecutorService executor = null;
    private boolean isshutdown = false;

    public void setMessageRingBufferDispatcher(MessageRingBufferDispatcher messageRingBufferDispatcher) {
        this.messageRingBufferDispatcher = messageRingBufferDispatcher;
    }

    public void setMaxsize(int maxsize) {
        this.maxsize = maxsize;
    }

    public void setChecktime(int checktime) {
        this.checktime = checktime;
    }

    public void setMinsize(int minsize) {
        this.minsize = minsize;
    }

    public RingBuffer<MessageEvent> getRingBuffer() {
        return this.ringBuffer;
    }

    public void setRingBuffer(RingBuffer<MessageEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public WaitStrategy getStrategy() {
        return this.strategy;
    }

    public void setStrategy(WaitStrategy strategy) {
        this.strategy = strategy;
    }

    public int getProcessorsize() {
        return this.workProcessorManager.getProcessorList().size();
    }

    public boolean putMessage(Message message, boolean blocked) {
        if (!this.quene.isEmpty() && !blocked) {
            return false;
        }
        long seq = 0L;
        try {
            seq = this.ringBuffer.tryNext();
        }
        catch (InsufficientCapacityException e) {
            if (!blocked) {
                return false;
            }
            this.quene.add(message);
            this.semaphore.release();
            return true;
        }
        ((MessageEvent)this.ringBuffer.get(seq)).setMessage(message);
        this.ringBuffer.publish(seq);
        return true;
    }

    private void putQueneMessage() {
        long seq = this.ringBuffer.next();
        Message message = this.quene.poll();
        ((MessageEvent)this.ringBuffer.get(seq)).setMessage(message);
        this.ringBuffer.publish(seq);
    }

    public Sentinel getSentinel() {
        return this.sentinel;
    }

    public void afterPropertiesSet() throws Exception {
        EventFactory<MessageEvent> factory = new EventFactory<MessageEvent>(){

            public MessageEvent newInstance() {
                return new MessageEvent();
            }
        };
        this.executor = Executors.newCachedThreadPool();
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                block2: while (true) {
                    try {
                        while (!RingBufferManager.this.isshutdown) {
                            RingBufferManager.this.semaphore.acquire();
                            if (RingBufferManager.this.isshutdown) break block2;
                            RingBufferManager.this.putQueneMessage();
                        }
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                        continue;
                    }
                    break;
                }
            }
        });
        this.disruptor = new Disruptor((EventFactory)factory, this.bufferSize, (ThreadFactory)DaemonThreadFactory.INSTANCE, ProducerType.MULTI, this.strategy);
        this.ringBuffer = this.disruptor.start();
        this.workProcessorManager = new WorkProcessorManager(this.executor, this.messageRingBufferDispatcher, this.appcontext, this.ringBuffer, this.sentinel);
        this.sentinel.setMaxsize(this.maxsize);
        this.sentinel.setMinsize(this.minsize);
        this.sentinel.setWorkProcessorManager(this.workProcessorManager);
        this.workProcessorManager.incrConsumer(this.minsize);
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.scheduledExecutorService.scheduleAtFixedRate(this.sentinel, this.checktime, this.checktime, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        this.isshutdown = true;
        this.semaphore.release();
        this.workProcessorManager.shutdown();
        this.disruptor.shutdown();
        this.executor.shutdown();
        this.scheduledExecutorService.shutdown();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.appcontext = applicationContext;
    }
}

