/*
 * Decompiled with CFR 0.152.
 */
package eventcenter.leveldb;

import eventcenter.api.CommonEventSource;
import eventcenter.api.EventCenterConfig;
import eventcenter.api.EventListener;
import eventcenter.api.EventListenerTask;
import eventcenter.api.async.EventQueue;
import eventcenter.api.async.QueueEventContainer;
import eventcenter.api.tx.EventTxnStatus;
import eventcenter.api.tx.ResumeTxnHandler;
import eventcenter.leveldb.EventSourceWrapper;
import eventcenter.leveldb.LevelDBQueue;
import eventcenter.leveldb.TxEventListenerTask;
import eventcenter.leveldb.tx.TransactionConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.WriteBatch;

public class LevelDBContainer
extends QueueEventContainer {
    protected ThreadPoolExecutor threadPool;
    protected ListenQueueThread listenQueueThread;
    protected int corePoolSize = Runtime.getRuntime().availableProcessors();
    protected int maximumPoolSize = this.corePoolSize * 2;
    protected int keepAliveTime = 60;
    protected final Object locker = new Object();
    protected ArrayBlockingQueue<Runnable> bockingQueue;
    protected float blockingQueueFactor = 0.1f;
    protected int blockingQueueSize;
    protected long loopQueueInterval = 1000L;
    protected TransactionConfig transactionConfig;
    protected final AtomicLong counter = new AtomicLong(0L);

    public LevelDBContainer(EventCenterConfig config, EventQueue queue) {
        super(config, queue);
    }

    protected ThreadPoolExecutor createThreadPool() {
        this.bockingQueue = new ArrayBlockingQueue(this.blockingQueueSize);
        return new InnerThreadPool(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, TimeUnit.SECONDS, this.bockingQueue, this.locker);
    }

    public int getCorePoolSize() {
        return this.corePoolSize;
    }

    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public int getMaximumPoolSize() {
        return this.maximumPoolSize;
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }

    public int getKeepAliveTime() {
        return this.keepAliveTime;
    }

    public void setKeepAliveTime(int keepAliveTime) {
        this.keepAliveTime = keepAliveTime;
    }

    public float getBlockingQueueFactor() {
        return this.blockingQueueFactor;
    }

    public void setBlockingQueueFactor(float blockingQueueFactor) {
        this.blockingQueueFactor = blockingQueueFactor;
    }

    public long getLoopQueueInterval() {
        return this.loopQueueInterval;
    }

    public void setLoopQueueInterval(long loopQueueInterval) {
        this.loopQueueInterval = loopQueueInterval;
    }

    public TransactionConfig getTransactionConfig() {
        return this.transactionConfig;
    }

    public void setTransactionConfig(TransactionConfig transactionConfig) {
        this.transactionConfig = transactionConfig;
    }

    public void startup() throws Exception {
        this.blockingQueueSize = (int)((float)this.maximumPoolSize * this.blockingQueueFactor);
        if (this.blockingQueueSize < this.corePoolSize) {
            this.blockingQueueSize = this.corePoolSize;
        }
        this.threadPool = this.createThreadPool();
        this.listenQueueThread = new ListenQueueThread(this.locker, this);
        LevelDBQueue _q = (LevelDBQueue)this.queue;
        if (this.transactionConfig != null) {
            _q.setTxnCapacity(this.maximumPoolSize + this.blockingQueueSize + 100);
        }
        _q.open();
        this.listenQueueThread.start();
        this.logger.info((Object)"leveldb container startup success");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws Exception {
        this.listenQueueThread.close();
        this.threadPool.shutdownNow();
        this.getLevelDBQueue().close();
        Object object = this.locker;
        synchronized (object) {
            this.locker.notifyAll();
        }
        this.logger.info((Object)"leveldb container closed success");
    }

    public boolean isPersisted() {
        return true;
    }

    public boolean isIdle() {
        return this.counter.get() < (long)this.maximumPoolSize;
    }

    public void commitTransaction(EventTxnStatus txn) {
        try {
            if (!this.listenQueueThread.flag) {
                return;
            }
            this.getLevelDBQueue().commit(txn);
        }
        catch (Exception e) {
            this.logger.error((Object)("commit txn[" + txn.getEventId() + "] error:" + e.getMessage()), (Throwable)e);
        }
    }

    private LevelDBQueue getLevelDBQueue() {
        return (LevelDBQueue)this.queue;
    }

    boolean checkUncommitTransactions() {
        if (null == this.transactionConfig) {
            return false;
        }
        try {
            int count = this.getLevelDBQueue().getTxnQueueComponent().countOfTxn();
            if (count > 0) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("found " + count + " txn not committed.it would resume txn thread"));
                }
                return true;
            }
            return false;
        }
        catch (Exception e) {
            this.logger.error((Object)("check txn queue component's count error:" + e.getMessage()));
            return false;
        }
    }

    protected EventListener findAsyncEventListeners(CommonEventSource message, Class<? extends EventListener> type) {
        List listeners = super.findAsyncEventListeners(message);
        if (listeners.size() == 0) {
            return null;
        }
        for (EventListener listener : listeners) {
            if (listener.getClass() != type) continue;
            return listener;
        }
        return null;
    }

    public int countOfMaxConcurrent() {
        if (null == this.threadPool) {
            return 0;
        }
        return this.threadPool.getMaximumPoolSize();
    }

    public int countOfLiveThread() {
        return this.counter.intValue();
    }

    public int countOfQueueBuffer() {
        if (null == this.threadPool) {
            return 0;
        }
        return this.threadPool.getQueue().size();
    }

    class ListenQueueThread
    extends Thread {
        private final Object locker;
        private volatile boolean flag;
        private final LevelDBContainer containerRef;

        public ListenQueueThread(Object locker, LevelDBContainer containerRef) {
            super("listen-queue-thread");
            this.flag = true;
            this.locker = locker;
            this.containerRef = containerRef;
        }

        @Override
        public void run() {
            if (LevelDBContainer.this.checkUncommitTransactions()) {
                try {
                    LevelDBContainer.this.getLevelDBQueue().getTxnQueueComponent().resumeTxn(new ResumeTxnHandler(){

                        public void resume(EventTxnStatus status, CommonEventSource event) {
                            EventListener listener = LevelDBContainer.this.findAsyncEventListeners(event, status.getListenerType());
                            if (null == listener) {
                                LevelDBContainer.this.logger.warn((Object)new StringBuilder("txnId[").append(status.getTxnId()).append("], evt:").append(event).append(" can't found listener:").append(status.getListenerType()).append(", resume failure"));
                                return;
                            }
                            ListenQueueThread.this.consumeEvent(event, listener, status);
                            if (LevelDBContainer.this.logger.isDebugEnabled()) {
                                LevelDBContainer.this.logger.debug((Object)String.format("resumed %s event success.", event.toString()));
                            }
                        }
                    });
                    if (LevelDBContainer.this.logger.isDebugEnabled()) {
                        LevelDBContainer.this.logger.debug((Object)"resumed uncommitted txn success!");
                    }
                }
                catch (Throwable e) {
                    LevelDBContainer.this.logger.fatal((Object)("resume uncommitted transaction failure:" + e.getMessage()), e);
                }
            }
            while (this.flag) {
                try {
                    if (null == LevelDBContainer.this.transactionConfig) {
                        this.listenQueue();
                        continue;
                    }
                    this.listenQueueWithTxn();
                }
                catch (Throwable e) {
                    LevelDBContainer.this.logger.fatal((Object)("happend fatal error on queue.transfer:" + e.getMessage()));
                }
            }
        }

        private void listenQueue() {
            List listeners;
            long start = System.currentTimeMillis();
            CommonEventSource evt = LevelDBContainer.this.queue.transfer(LevelDBContainer.this.loopQueueInterval);
            if (null == evt) {
                return;
            }
            if (LevelDBContainer.this.logger.isTraceEnabled()) {
                LevelDBContainer.this.logger.trace((Object)new StringBuilder("transfer evt:").append(evt).append(" success. wait:").append(System.currentTimeMillis() - start).append(" ms."));
            }
            if (null == (listeners = LevelDBContainer.this.findAsyncEventListeners(evt)) || listeners.size() == 0) {
                return;
            }
            this.consumeEvent(evt, listeners, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void listenQueueWithTxn() {
            long start = System.currentTimeMillis();
            DB db = LevelDBContainer.this.getLevelDBQueue().getQueueMiddleComponent().getDB();
            WriteBatch wb = db.createWriteBatch();
            CommonEventSource evt = null;
            List listeners = null;
            List<EventTxnStatus> txnList = null;
            try {
                evt = LevelDBContainer.this.getLevelDBQueue().transfer(LevelDBContainer.this.loopQueueInterval, wb);
                if (null == evt) {
                    return;
                }
                if (LevelDBContainer.this.logger.isTraceEnabled()) {
                    LevelDBContainer.this.logger.trace((Object)new StringBuilder("transfer evt:").append(evt).append(" success. wait:").append(System.currentTimeMillis() - start).append(" ms."));
                }
                if (null == (listeners = LevelDBContainer.this.findAsyncEventListeners(evt)) || listeners.size() == 0) {
                    return;
                }
                txnList = this.beginTransaction(evt, listeners, wb);
            }
            finally {
                db.write(wb);
                try {
                    wb.close();
                }
                catch (Throwable e) {
                    LevelDBContainer.this.logger.error((Object)("consume event with txn failure:" + e.getMessage()), e);
                }
            }
            if (listeners != null && listeners.size() > 0) {
                EventSourceWrapper wrapper = (EventSourceWrapper)evt;
                this.consumeEvent((CommonEventSource)wrapper, listeners, txnList);
            }
        }

        private void consumeEvent(CommonEventSource evt, List<EventListener> listeners, List<EventTxnStatus> txns) {
            int index = 0;
            for (EventListener listener : listeners) {
                EventTxnStatus txn = null;
                if (null != txns && txns.size() > 0) {
                    try {
                        txn = txns.get(index++);
                    }
                    catch (Exception e) {
                        LevelDBContainer.this.logger.error((Object)("get txn error:" + e.getMessage()), (Throwable)e);
                    }
                }
                this.consumeEvent(evt, listener, txn);
            }
        }

        private void consumeEvent(CommonEventSource evt, EventListener listener, EventTxnStatus txn) {
            boolean innerFlag = true;
            Object task = null;
            if (evt instanceof EventSourceWrapper) {
                evt = ((EventSourceWrapper)evt).getEvt();
            }
            task = txn == null ? new EventListenerTask(listener, evt) : new TxEventListenerTask(listener, evt, this.containerRef, txn);
            while (innerFlag && this.flag) {
                try {
                    LevelDBContainer.this.threadPool.execute((Runnable)task);
                    innerFlag = false;
                    LevelDBContainer.this.counter.incrementAndGet();
                }
                catch (RejectedExecutionException e) {
                    if (LevelDBContainer.this.logger.isTraceEnabled()) {
                        LevelDBContainer.this.logger.trace((Object)e.getMessage(), (Throwable)e);
                    }
                    this.waitForRelease(LevelDBContainer.this.loopQueueInterval);
                }
                catch (Throwable e) {
                    LevelDBContainer.this.logger.error((Object)e.getMessage(), e);
                    innerFlag = false;
                    if (null == txn) continue;
                    LevelDBContainer.this.commitTransaction(txn);
                }
            }
        }

        private List<EventTxnStatus> beginTransaction(CommonEventSource evt, List<EventListener> listeners, WriteBatch writeBatch) {
            EventSourceWrapper wrapper = (EventSourceWrapper)evt;
            ArrayList<EventTxnStatus> txnList = new ArrayList<EventTxnStatus>(listeners.size() + 1);
            for (EventListener listener : listeners) {
                try {
                    txnList.add(LevelDBContainer.this.getLevelDBQueue().getTxnStatus(wrapper, wrapper.getTxnId(), listener, writeBatch));
                }
                catch (Exception e) {
                    LevelDBContainer.this.logger.error((Object)("getTxnStatus error:" + e.getMessage()), (Throwable)e);
                }
            }
            return txnList;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void waitForRelease(long timeout) {
            Object locker;
            Object object = locker = this.locker;
            synchronized (object) {
                try {
                    if (timeout == -1L) {
                        locker.wait();
                    } else {
                        locker.wait(timeout);
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            Object locker;
            this.flag = false;
            Object object = locker = this.locker;
            synchronized (object) {
                locker.notifyAll();
            }
        }
    }

    class InnerThreadPool
    extends ThreadPoolExecutor {
        private final Object locker;

        public InnerThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Object locker) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadFactory(){
                final AtomicInteger threadCount = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "ec-leveldb-queue-worker-" + this.threadCount.getAndIncrement());
                }
            });
            this.locker = locker;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            Object object = this.locker;
            synchronized (object) {
                this.locker.notifyAll();
            }
            LevelDBContainer.this.counter.decrementAndGet();
        }
    }
}

