/*
 * Decompiled with CFR 0.152.
 */
package eventcenter.leveldb.tx;

import eventcenter.api.CommonEventSource;
import eventcenter.api.EventListener;
import eventcenter.api.tx.EventTxnStatus;
import eventcenter.api.tx.ResumeTxnHandler;
import eventcenter.leveldb.tx.IndexIterator;
import eventcenter.leveldb.tx.LevelDBBucket;
import eventcenter.leveldb.tx.LevelDBEventTxnStatus;
import eventcenter.leveldb.tx.LockInvoker;
import eventcenter.leveldb.tx.TxnQueueInfo;
import eventcenter.leveldb.tx.TxnRef;
import eventcenter.remote.utils.StringHelper;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.WriteBatch;

public class TxnQueueComponent {
    private static final String KEY_INFO = "txn_queue_";
    private final String queueName;
    private final DB db;
    BucketGroup bucketGroup;
    private TxnQueueInfo txnQueueInfo;
    private Integer txnCapacity = 1000;
    private Integer failureCapacity = 10000;
    private Integer discardCapacity = 10000;
    private Integer retryCount = 3;
    private Integer txnTimeout = 60;
    private boolean openRetry = false;
    private volatile boolean isOpen = false;
    private String indexPrefix;
    private final ReentrantLock lock = new ReentrantLock();
    private final Logger logger = Logger.getLogger(this.getClass());

    public TxnQueueComponent(String queueName, DB db) {
        this.db = db;
        this.queueName = queueName;
        this.bucketGroup = new BucketGroup();
        this.indexPrefix = this.buildTxnPrefix();
    }

    private String buildTxnQueueInfoKey() {
        return KEY_INFO + this.queueName;
    }

    public void open() throws IOException {
        if (this.isOpen) {
            return;
        }
        this.loadInfo();
        this.loadBuckets();
        this.isOpen = true;
    }

    public int countOfTxn() throws Exception {
        return this.bucketGroup.txnBucket.count();
    }

    public void shutdown() {
        this.isOpen = false;
    }

    private void loadInfo() throws IOException {
        String infoKey = this.buildTxnQueueInfoKey();
        this.txnQueueInfo = LevelDBBucket.get(this.db, infoKey, TxnQueueInfo.class);
        if (null == this.txnQueueInfo) {
            this.txnQueueInfo = new TxnQueueInfo();
        }
    }

    private void updateInfo() throws IOException {
        LevelDBBucket.set(this.db, this.buildTxnQueueInfoKey(), (Serializable)this.txnQueueInfo);
    }

    private void loadBuckets() throws IOException {
        boolean isFirstLoad = true;
        if (StringHelper.isNotEmpty((String)this.txnQueueInfo.getTxnBucketId())) {
            isFirstLoad = false;
        }
        this.bucketGroup.loadTxnBucket(this.db, this.txnQueueInfo.getTxnBucketId());
        if (isFirstLoad) {
            this.updateInfo();
        }
    }

    public Integer getTxnCapacity() {
        return this.txnCapacity;
    }

    public void setTxnCapacity(Integer txnCapacity) {
        this.txnCapacity = txnCapacity;
    }

    public Integer getFailureCapacity() {
        return this.failureCapacity;
    }

    public void setFailureCapacity(Integer failureCapacity) {
        if (failureCapacity < this.txnCapacity * 5) {
            throw new IllegalArgumentException("failureCapacity need be more or equal than txnCapacity * 5");
        }
        this.failureCapacity = failureCapacity;
    }

    public Integer getDiscardCapacity() {
        return this.discardCapacity;
    }

    public void setDiscardCapacity(Integer discardCapacity) {
        if (discardCapacity < this.txnCapacity * 5) {
            throw new IllegalArgumentException("failureCapacity need be more or equal than txnCapacity * 5");
        }
        this.discardCapacity = discardCapacity;
    }

    public Integer getRetryCount() {
        return this.retryCount;
    }

    public void setRetryCount(Integer retryCount) {
        this.retryCount = retryCount;
    }

    public boolean isOpenRetry() {
        return this.openRetry;
    }

    public void setOpenRetry(boolean openRetry) {
        this.openRetry = openRetry;
    }

    public void resumeTxn(ResumeTxnHandler handler) throws IllegalAccessException, IOException {
        this.bucketGroup.resumeTxn(handler);
    }

    public EventTxnStatus getTxnStatus(String eventId, Class<? extends EventListener> listenerType, String txnId) throws Exception {
        return this.bucketGroup.getTxnStatus(eventId, listenerType, txnId, null);
    }

    public EventTxnStatus getTxnStatus(String eventId, Class<? extends EventListener> listenerType, String txnId, WriteBatch writeBatch) throws Exception {
        return this.bucketGroup.getTxnStatus(eventId, listenerType, txnId, writeBatch);
    }

    public void commit(EventTxnStatus txnStatus) throws Exception {
        this.bucketGroup.commit(txnStatus);
    }

    private void updateDB(WriteBatch wb) throws IOException {
        if (!this.isOpen) {
            return;
        }
        this.db.write(wb);
        wb.close();
    }

    CommonEventSource getEvent(String txnId) throws IOException {
        return LevelDBBucket.get(this.db, this.wrapperKey(txnId), CommonEventSource.class);
    }

    private void deleteEvent(WriteBatch wb, String txnId) {
        LevelDBBucket.delete(wb, this.wrapperKey(txnId));
        LevelDBBucket.delete(wb, this.buildTxnRefKey(txnId));
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("delete event:" + txnId));
        }
    }

    String wrapperKey(String key) {
        return this.queueName + "_" + key;
    }

    String buildTxnRefKey(String txnId) {
        return this.indexPrefix + "_ref_" + txnId;
    }

    String buildTxnKey(String bucketTxnId) {
        return this.indexPrefix + "_" + bucketTxnId;
    }

    private String buildTxnPrefix() {
        return "txn_" + this.queueName;
    }

    private String getBucketTxnIdFromIndex(String index) {
        return index.substring(this.indexPrefix.length() + 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T lockAndInvoke(LockInvoker<T> invoker) throws Exception {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (!this.isOpen) {
                throw new IllegalAccessException("queue had been closed");
            }
            T t = invoker.lockAndInvoke();
            return t;
        }
        finally {
            lock.unlock();
        }
    }

    public void houseKeeping() throws IOException {
    }

    class BucketGroup {
        LevelDBBucket txnBucket;
        LevelDBBucket failureBucket;
        LevelDBBucket discardBucket;
        private volatile boolean invokedGetTxnStatus = false;

        BucketGroup() {
        }

        private void setInvokedGetTxnStatus(boolean invokedGetTxnStatus) {
            this.invokedGetTxnStatus = invokedGetTxnStatus;
        }

        private int calculateMaxPageCount(int elementCount) {
            return elementCount / 10 + 1;
        }

        public void loadTxnBucket(DB db, String id) throws IOException {
            LevelDBBucket.Profile profile = new LevelDBBucket.Profile();
            profile.setOpenCache(true);
            profile.setMaxCountOfPage(this.calculateMaxPageCount(TxnQueueComponent.this.txnCapacity));
            this.txnBucket = this.buildBucket(db, profile, id);
            if (StringHelper.isEmpty((String)id)) {
                TxnQueueComponent.this.txnQueueInfo.setTxnBucketId(this.txnBucket.getId());
            }
            if (TxnQueueComponent.this.logger.isDebugEnabled()) {
                TxnQueueComponent.this.logger.debug((Object)("txnBucket's id:" + this.txnBucket.getId()));
            }
        }

        public void loadFailureBucket(DB db, String id) throws IOException {
            LevelDBBucket.Profile profile = new LevelDBBucket.Profile();
            profile.setOpenCache(false);
            profile.setMaxCountOfPage(this.calculateMaxPageCount(TxnQueueComponent.this.failureCapacity));
            this.failureBucket = this.buildBucket(db, profile, id);
            if (StringHelper.isEmpty((String)id)) {
                TxnQueueComponent.this.txnQueueInfo.setFailureBucketId(this.failureBucket.getId());
            }
            if (TxnQueueComponent.this.logger.isDebugEnabled()) {
                TxnQueueComponent.this.logger.debug((Object)("failureBucket's id:" + this.txnBucket.getId()));
            }
        }

        public void loadDiscardBucket(DB db, String id) throws IOException {
            LevelDBBucket.Profile profile = new LevelDBBucket.Profile();
            profile.setOpenCache(false);
            profile.setMaxCountOfPage(this.calculateMaxPageCount(TxnQueueComponent.this.discardCapacity));
            this.discardBucket = this.buildBucket(db, profile, id);
            if (StringHelper.isEmpty((String)id)) {
                TxnQueueComponent.this.txnQueueInfo.setDiscardBucketId(this.discardBucket.getId());
            }
            if (TxnQueueComponent.this.logger.isDebugEnabled()) {
                TxnQueueComponent.this.logger.debug((Object)("discardBucket's id:" + this.txnBucket.getId()));
            }
        }

        private LevelDBBucket buildBucket(DB db, LevelDBBucket.Profile profile, String id) throws IOException {
            LevelDBBucket bucket = new LevelDBBucket.Builder(db).id(id).queueName(TxnQueueComponent.this.queueName).maxCountOfPage(profile.getMaxCountOfPage()).openCache(profile.isOpenCache()).build();
            return bucket;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        LevelDBEventTxnStatus createTxnStatus(String eventId, Class<? extends EventListener> listenerType, String txnId) throws Exception {
            WriteBatch writeBatch = TxnQueueComponent.this.db.createWriteBatch();
            try {
                LevelDBEventTxnStatus levelDBEventTxnStatus = this.createTxnStatus(eventId, listenerType, txnId, writeBatch);
                return levelDBEventTxnStatus;
            }
            finally {
                TxnQueueComponent.this.updateDB(writeBatch);
            }
        }

        LevelDBEventTxnStatus createTxnStatus(String eventId, Class<? extends EventListener> listenerType, String txnId, final WriteBatch writeBatch) throws Exception {
            final LevelDBEventTxnStatus status = new LevelDBEventTxnStatus();
            status.setBucketId(TxnQueueComponent.this.txnQueueInfo.getTxnBucketId());
            status.setComplete(false);
            status.setStart(new Date());
            status.setTxnId(txnId);
            status.setBucketTxnId(UUID.randomUUID().toString());
            status.setEventId(eventId);
            status.setListenerType(listenerType);
            return (LevelDBEventTxnStatus)((Object)TxnQueueComponent.this.lockAndInvoke(new LockInvoker<LevelDBEventTxnStatus>(){

                @Override
                public LevelDBEventTxnStatus lockAndInvoke() throws Exception {
                    String index = TxnQueueComponent.this.buildTxnKey(status.getBucketTxnId());
                    status.setPageNo(BucketGroup.this.txnBucket.savePageIndex(writeBatch, index));
                    BucketGroup.this.saveStatus(writeBatch, status);
                    TxnRef txnRef = BucketGroup.this.queryTxnRef(status.getTxnId());
                    if (null == txnRef) {
                        txnRef = new TxnRef();
                        txnRef.setTxnId(status.getTxnId());
                    }
                    txnRef.getBucketTxnIds().add(status.getBucketTxnId());
                    txnRef.increaseTxnCount();
                    BucketGroup.this.saveTxnRef(writeBatch, txnRef);
                    if (TxnQueueComponent.this.logger.isTraceEnabled()) {
                        TxnQueueComponent.this.logger.trace((Object)new StringBuilder("create event txn:").append(index).append(" for ").append(status.getListenerType().getName()).append(" success"));
                    }
                    return status;
                }
            }));
        }

        private void saveStatus(final WriteBatch wb, final LevelDBEventTxnStatus status) throws Exception {
            TxnQueueComponent.this.lockAndInvoke(new LockInvoker<Void>(){

                @Override
                public Void lockAndInvoke() throws Exception {
                    LevelDBBucket.set(wb, TxnQueueComponent.this.buildTxnKey(status.getBucketTxnId()), (Serializable)((Object)status));
                    return null;
                }
            });
        }

        private void saveTxnRef(final WriteBatch wb, final TxnRef txnRef) throws Exception {
            TxnQueueComponent.this.lockAndInvoke(new LockInvoker<Void>(){

                @Override
                public Void lockAndInvoke() throws Exception {
                    LevelDBBucket.set(wb, TxnQueueComponent.this.buildTxnRefKey(txnRef.getTxnId()), (Serializable)txnRef);
                    return null;
                }
            });
        }

        private TxnRef queryTxnRef(String txnId) throws IOException {
            return LevelDBBucket.get(TxnQueueComponent.this.db, TxnQueueComponent.this.buildTxnRefKey(txnId), TxnRef.class);
        }

        EventTxnStatus getTxnStatus(String eventId, Class<? extends EventListener> listenerType, String txnId, WriteBatch writeBatch) throws Exception {
            LevelDBEventTxnStatus status = this.findStatus(txnId, listenerType);
            if (null != status) {
                return status;
            }
            if (!this.invokedGetTxnStatus) {
                this.setInvokedGetTxnStatus(true);
            }
            if (null != writeBatch) {
                return this.createTxnStatus(eventId, listenerType, txnId, writeBatch);
            }
            return this.createTxnStatus(eventId, listenerType, txnId);
        }

        private void removeTxnBucketIndex(String index) throws IOException, IllegalAccessException {
            WriteBatch wb = TxnQueueComponent.this.db.createWriteBatch();
            try {
                this.txnBucket.popByIndex(wb, index);
                TxnQueueComponent.this.db.write(wb);
            }
            finally {
                wb.close();
            }
        }

        void commit(EventTxnStatus txnStatus) throws Exception {
            final LevelDBEventTxnStatus status = (LevelDBEventTxnStatus)txnStatus;
            TxnQueueComponent.this.lockAndInvoke(new LockInvoker<Void>(){

                @Override
                public Void lockAndInvoke() throws Exception {
                    WriteBatch writeBatch = TxnQueueComponent.this.db.createWriteBatch();
                    try {
                        String index = TxnQueueComponent.this.buildTxnKey(status.getBucketTxnId());
                        index = BucketGroup.this.txnBucket.popByIndex(writeBatch, index);
                        if (null == index) {
                            throw new IllegalAccessException("txn can't be found or may had been commit!");
                        }
                        if (BucketGroup.this.deleteStatus(writeBatch, status)) {
                            TxnQueueComponent.this.deleteEvent(writeBatch, status.getTxnId());
                        }
                        if (TxnQueueComponent.this.logger.isTraceEnabled()) {
                            TxnQueueComponent.this.logger.trace((Object)new StringBuilder("commit event txn:").append(status.getTxnId()).append(" for ").append(status.getListenerType().getName()).append(" success"));
                        }
                    }
                    finally {
                        TxnQueueComponent.this.updateDB(writeBatch);
                    }
                    return null;
                }
            });
        }

        void resumeTxn(final ResumeTxnHandler handler) throws IllegalAccessException, IOException {
            if (this.invokedGetTxnStatus) {
                throw new IllegalAccessException("it must be invoked before first getTxnStatus");
            }
            if (null == handler) {
                throw new IllegalArgumentException("please set ResumeTxnHandler parameter");
            }
            this.txnBucket.iterateIndex(new IndexIterator(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void iterateIndex(String index, int pageNo) throws Exception {
                    String bucketTxnId = TxnQueueComponent.this.getBucketTxnIdFromIndex(index);
                    LevelDBEventTxnStatus txn = BucketGroup.this.findStatusByBucketTxnId(bucketTxnId);
                    if (null == txn) {
                        TxnQueueComponent.this.logger.warn((Object)("can't find txn[" + bucketTxnId + "]"));
                        BucketGroup.this.removeTxnBucketIndex(index);
                        return;
                    }
                    CommonEventSource event = null;
                    boolean fail2delete = false;
                    try {
                        event = TxnQueueComponent.this.getEvent(txn.getTxnId());
                    }
                    catch (Exception e) {
                        TxnQueueComponent.this.logger.error((Object)("getEvent error, then it would direct to delete event:" + e.getMessage()), (Throwable)e);
                        fail2delete = true;
                    }
                    if (null == event) {
                        TxnQueueComponent.this.logger.warn((Object)("can't find event[" + bucketTxnId + "]"));
                        WriteBatch wb = TxnQueueComponent.this.db.createWriteBatch();
                        try {
                            BucketGroup.this.txnBucket.popByIndex(wb, index);
                            BucketGroup.this.deleteStatus(wb, txn);
                            if (fail2delete) {
                                TxnQueueComponent.this.deleteEvent(wb, txn.getTxnId());
                            }
                            TxnQueueComponent.this.db.write(wb);
                        }
                        finally {
                            wb.close();
                        }
                        return;
                    }
                    handler.resume((EventTxnStatus)txn, event);
                }
            });
        }

        private boolean deleteStatus(final WriteBatch wb, final LevelDBEventTxnStatus status) throws Exception {
            return (Boolean)TxnQueueComponent.this.lockAndInvoke(new LockInvoker<Boolean>(){

                @Override
                public Boolean lockAndInvoke() throws Exception {
                    LevelDBBucket.delete(wb, TxnQueueComponent.this.buildTxnKey(status.getBucketTxnId()));
                    TxnRef txnRef = BucketGroup.this.queryTxnRef(status.getTxnId());
                    if (null == txnRef) {
                        return true;
                    }
                    txnRef.decreaseTxnCount();
                    txnRef.getBucketTxnIds().remove(status.getBucketTxnId());
                    return txnRef.getTxnCount() <= 0;
                }
            });
        }

        LevelDBEventTxnStatus findStatus(String txnId, Class<? extends EventListener> listenerType) throws IOException {
            TxnRef txnRef = this.queryTxnRef(txnId);
            if (null == txnRef) {
                return null;
            }
            ArrayList list = new ArrayList(txnRef.getTxnCount());
            for (String bucketTxnId : txnRef.getBucketTxnIds()) {
                LevelDBEventTxnStatus status = this.findStatusByBucketTxnId(bucketTxnId);
                if (status.getListenerType() != listenerType) continue;
                return status;
            }
            return null;
        }

        LevelDBEventTxnStatus findStatusByBucketTxnId(String bucketTxnId) throws IOException {
            return LevelDBBucket.get(TxnQueueComponent.this.db, TxnQueueComponent.this.buildTxnKey(bucketTxnId), LevelDBEventTxnStatus.class);
        }
    }
}

