/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerFactoryImpl
implements ManagedLedgerFactory {
    private final MetaStore store;
    private final BookKeeper bookKeeper;
    private final boolean isBookkeeperManaged;
    private final ZooKeeper zookeeper;
    private final ManagedLedgerFactoryConfig config;
    protected final OrderedScheduler scheduledExecutor = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(16).name("bookkeeper-ml-scheduler").build();
    private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(16).name("bookkeeper-ml-workers").build();
    protected final ManagedLedgerFactoryMBeanImpl mbean;
    protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap();
    private final EntryCacheManager entryCacheManager;
    private long lastStatTimestamp = System.nanoTime();
    private final ScheduledFuture<?> statsTask;
    private static final int StatsPeriodSeconds = 60;
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);

    public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration) throws Exception {
        this(bkClientConfiguration, new ManagedLedgerFactoryConfig());
    }

    public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, ManagedLedgerFactoryConfig config) throws Exception {
        CountDownLatch counter = new CountDownLatch(1);
        String zookeeperQuorum = (String)Preconditions.checkNotNull((Object)bkClientConfiguration.getZkServers());
        this.zookeeper = new ZooKeeper(zookeeperQuorum, bkClientConfiguration.getZkTimeout(), event -> {
            if (event.getState().equals((Object)Watcher.Event.KeeperState.SyncConnected)) {
                log.info("Connected to zookeeper");
                counter.countDown();
            } else {
                log.error("Error connecting to zookeeper {}", (Object)event);
            }
        });
        if (!counter.await(bkClientConfiguration.getZkTimeout(), TimeUnit.MILLISECONDS) || this.zookeeper.getState() != ZooKeeper.States.CONNECTED) {
            throw new ManagedLedgerException("Error connecting to ZooKeeper at '" + zookeeperQuorum + "'");
        }
        this.bookKeeper = new BookKeeper(bkClientConfiguration, this.zookeeper);
        this.isBookkeeperManaged = true;
        this.store = new MetaStoreImplZookeeper(this.zookeeper, this.orderedExecutor);
        this.config = config;
        this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
        this.entryCacheManager = new EntryCacheManager(this);
        this.statsTask = this.scheduledExecutor.scheduleAtFixedRate(() -> this.refreshStats(), 0L, 60L, TimeUnit.SECONDS);
    }

    public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
        this(bookKeeper, zooKeeper, new ManagedLedgerFactoryConfig());
    }

    public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config) throws Exception {
        this.bookKeeper = bookKeeper;
        this.isBookkeeperManaged = false;
        this.zookeeper = null;
        this.store = new MetaStoreImplZookeeper(zooKeeper, this.orderedExecutor);
        this.config = config;
        this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
        this.entryCacheManager = new EntryCacheManager(this);
        this.statsTask = this.scheduledExecutor.scheduleAtFixedRate(() -> this.refreshStats(), 0L, 60L, TimeUnit.SECONDS);
    }

    private synchronized void refreshStats() {
        long now = System.nanoTime();
        long period = now - this.lastStatTimestamp;
        this.mbean.refreshStats(period, TimeUnit.NANOSECONDS);
        this.ledgers.values().forEach(mlfuture -> {
            ManagedLedgerImpl ml = mlfuture.getNow(null);
            if (ml != null) {
                ml.mbean.refreshStats(period, TimeUnit.NANOSECONDS);
            }
        });
        this.lastStatTimestamp = now;
    }

    public Map<String, ManagedLedgerImpl> getManagedLedgers() {
        return Maps.filterValues((Map)Maps.transformValues(this.ledgers, future -> future.getNow(null)), (Predicate)Predicates.notNull());
    }

    @Override
    public ManagedLedger open(String name) throws InterruptedException, ManagedLedgerException {
        return this.open(name, new ManagedLedgerConfig());
    }

    @Override
    public ManagedLedger open(String name, ManagedLedgerConfig config) throws InterruptedException, ManagedLedgerException {
        class Result {
            ManagedLedger l = null;
            ManagedLedgerException e = null;

            Result() {
            }
        }
        final Result r = new Result();
        final CountDownLatch latch = new CountDownLatch(1);
        this.asyncOpen(name, config, new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            @Override
            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                r.l = ledger;
                latch.countDown();
            }

            @Override
            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                r.e = exception;
                latch.countDown();
            }
        }, null);
        latch.await();
        if (r.e != null) {
            throw r.e;
        }
        return r.l;
    }

    @Override
    public void asyncOpen(String name, AsyncCallbacks.OpenLedgerCallback callback, Object ctx) {
        this.asyncOpen(name, new ManagedLedgerConfig(), callback, ctx);
    }

    @Override
    public void asyncOpen(final String name, ManagedLedgerConfig config, AsyncCallbacks.OpenLedgerCallback callback, Object ctx) {
        CompletableFuture<ManagedLedgerImpl> existingFuture = this.ledgers.get(name);
        if (existingFuture != null && existingFuture.isDone()) {
            try {
                ManagedLedgerImpl l = existingFuture.get();
                if (l.getState().equals(ManagedLedgerImpl.State.Fenced.toString()) || l.getState().equals(ManagedLedgerImpl.State.Closed.toString())) {
                    log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", (Object)name, (Object)l.getState());
                    this.ledgers.remove(name, existingFuture);
                }
            }
            catch (Exception e) {
                log.warn("[{}] Got exception while trying to retrieve ledger", (Object)name, (Object)e);
            }
        }
        ((CompletableFuture)this.ledgers.computeIfAbsent(name, mlName -> {
            final CompletableFuture future = new CompletableFuture();
            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, this.bookKeeper, this.store, config, this.scheduledExecutor, this.orderedExecutor, name);
            newledger.initialize(new ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback(){

                @Override
                public void initializeComplete() {
                    future.complete(newledger);
                }

                @Override
                public void initializeFailed(ManagedLedgerException e) {
                    ManagedLedgerFactoryImpl.this.ledgers.remove(name, future);
                    future.completeExceptionally(e);
                }
            }, null);
            return future;
        }).thenAccept(ml -> callback.openLedgerComplete((ManagedLedger)ml, ctx))).exceptionally(exception -> {
            callback.openLedgerFailed((ManagedLedgerException)exception.getCause(), ctx);
            return null;
        });
    }

    void close(ManagedLedger ledger) {
        this.ledgers.remove(ledger.getName());
        this.entryCacheManager.removeEntryCache(ledger.getName());
    }

    @Override
    public void shutdown() throws InterruptedException, ManagedLedgerException {
        this.statsTask.cancel(true);
        int numLedgers = this.ledgers.size();
        final CountDownLatch latch = new CountDownLatch(numLedgers);
        log.info("Closing {} ledgers", (Object)numLedgers);
        for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : this.ledgers.values()) {
            final ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
            if (ledger == null) continue;
            ledger.asyncClose(new AsyncCallbacks.CloseCallback(){

                @Override
                public void closeComplete(Object ctx) {
                    latch.countDown();
                }

                @Override
                public void closeFailed(ManagedLedgerException exception, Object ctx) {
                    log.warn("[{}] Got exception when closing managed ledger: {}", (Object)ledger.getName(), (Object)exception);
                    latch.countDown();
                }
            }, null);
        }
        latch.await();
        log.info("{} ledgers closed", (Object)numLedgers);
        if (this.zookeeper != null) {
            this.zookeeper.close();
        }
        if (this.isBookkeeperManaged) {
            try {
                this.bookKeeper.close();
            }
            catch (BKException e) {
                throw new ManagedLedgerException(e);
            }
        }
        this.scheduledExecutor.shutdown();
        this.orderedExecutor.shutdown();
        this.entryCacheManager.clear();
    }

    @Override
    public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException {
        class Result {
            ManagedLedgerInfo info = null;
            ManagedLedgerException e = null;

            Result() {
            }
        }
        final Result r = new Result();
        final CountDownLatch latch = new CountDownLatch(1);
        this.asyncGetManagedLedgerInfo(name, new AsyncCallbacks.ManagedLedgerInfoCallback(){
            {
            }

            @Override
            public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
                r.info = info;
                latch.countDown();
            }

            @Override
            public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
                r.e = exception;
                latch.countDown();
            }
        }, null);
        latch.await();
        if (r.e != null) {
            throw r.e;
        }
        return r.info;
    }

    @Override
    public void asyncGetManagedLedgerInfo(final String name, final AsyncCallbacks.ManagedLedgerInfoCallback callback, final Object ctx) {
        this.store.getManagedLedgerInfo(name, false, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>(){

            @Override
            public void operationComplete(MLDataFormats.ManagedLedgerInfo pbInfo, MetaStore.Stat stat) {
                final ManagedLedgerInfo info = new ManagedLedgerInfo();
                info.version = stat.getVersion();
                info.creationDate = DateFormatter.format((long)stat.getCreationTimestamp());
                info.modificationDate = DateFormatter.format((long)stat.getModificationTimestamp());
                info.ledgers = new ArrayList<ManagedLedgerInfo.LedgerInfo>(pbInfo.getLedgerInfoCount());
                if (pbInfo.hasTerminatedPosition()) {
                    info.terminatedPosition = new ManagedLedgerInfo.PositionInfo();
                    info.terminatedPosition.ledgerId = pbInfo.getTerminatedPosition().getLedgerId();
                    info.terminatedPosition.entryId = pbInfo.getTerminatedPosition().getEntryId();
                }
                for (int i = 0; i < pbInfo.getLedgerInfoCount(); ++i) {
                    MLDataFormats.ManagedLedgerInfo.LedgerInfo pbLedgerInfo = pbInfo.getLedgerInfo(i);
                    ManagedLedgerInfo.LedgerInfo ledgerInfo = new ManagedLedgerInfo.LedgerInfo();
                    ledgerInfo.ledgerId = pbLedgerInfo.getLedgerId();
                    ledgerInfo.entries = pbLedgerInfo.hasEntries() ? Long.valueOf(pbLedgerInfo.getEntries()) : null;
                    ledgerInfo.size = pbLedgerInfo.hasSize() ? Long.valueOf(pbLedgerInfo.getSize()) : null;
                    info.ledgers.add(ledgerInfo);
                }
                ManagedLedgerFactoryImpl.this.store.getCursors(name, new MetaStore.MetaStoreCallback<List<String>>(){

                    @Override
                    public void operationComplete(List<String> cursorsList, MetaStore.Stat stat) {
                        info.cursors = new ConcurrentSkipListMap<String, ManagedLedgerInfo.CursorInfo>();
                        ArrayList<CompletableFuture<Void>> cursorsFutures = new ArrayList<CompletableFuture<Void>>();
                        for (final String cursorName : cursorsList) {
                            final CompletableFuture cursorFuture = new CompletableFuture();
                            cursorsFutures.add(cursorFuture);
                            ManagedLedgerFactoryImpl.this.store.asyncGetCursorInfo(name, cursorName, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>(){

                                @Override
                                public void operationComplete(MLDataFormats.ManagedCursorInfo pbCursorInfo, MetaStore.Stat stat) {
                                    int i;
                                    ManagedLedgerInfo.CursorInfo cursorInfo = new ManagedLedgerInfo.CursorInfo();
                                    cursorInfo.version = stat.getVersion();
                                    cursorInfo.creationDate = DateFormatter.format((long)stat.getCreationTimestamp());
                                    cursorInfo.modificationDate = DateFormatter.format((long)stat.getModificationTimestamp());
                                    cursorInfo.cursorsLedgerId = pbCursorInfo.getCursorsLedgerId();
                                    if (pbCursorInfo.hasMarkDeleteLedgerId()) {
                                        cursorInfo.markDelete = new ManagedLedgerInfo.PositionInfo();
                                        cursorInfo.markDelete.ledgerId = pbCursorInfo.getMarkDeleteLedgerId();
                                        cursorInfo.markDelete.entryId = pbCursorInfo.getMarkDeleteEntryId();
                                    }
                                    if (pbCursorInfo.getPropertiesCount() > 0) {
                                        cursorInfo.properties = Maps.newTreeMap();
                                        for (i = 0; i < pbCursorInfo.getPropertiesCount(); ++i) {
                                            MLDataFormats.LongProperty property = pbCursorInfo.getProperties(i);
                                            cursorInfo.properties.put(property.getName(), property.getValue());
                                        }
                                    }
                                    if (pbCursorInfo.getIndividualDeletedMessagesCount() > 0) {
                                        cursorInfo.individualDeletedMessages = new ArrayList<ManagedLedgerInfo.MessageRangeInfo>();
                                        for (i = 0; i < pbCursorInfo.getIndividualDeletedMessagesCount(); ++i) {
                                            MLDataFormats.MessageRange range = pbCursorInfo.getIndividualDeletedMessages(i);
                                            ManagedLedgerInfo.MessageRangeInfo rangeInfo = new ManagedLedgerInfo.MessageRangeInfo();
                                            rangeInfo.from.ledgerId = range.getLowerEndpoint().getLedgerId();
                                            rangeInfo.from.entryId = range.getLowerEndpoint().getEntryId();
                                            rangeInfo.to.ledgerId = range.getUpperEndpoint().getLedgerId();
                                            rangeInfo.to.entryId = range.getUpperEndpoint().getEntryId();
                                            cursorInfo.individualDeletedMessages.add(rangeInfo);
                                        }
                                    }
                                    info.cursors.put(cursorName, cursorInfo);
                                    cursorFuture.complete(null);
                                }

                                @Override
                                public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                                    cursorFuture.completeExceptionally(e);
                                }
                            });
                        }
                        ((CompletableFuture)Futures.waitForAll(cursorsFutures).thenRun(() -> callback.getInfoComplete(info, ctx))).exceptionally(ex -> {
                            callback.getInfoFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
                            return null;
                        });
                    }

                    @Override
                    public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                        callback.getInfoFailed(e, ctx);
                    }
                });
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                callback.getInfoFailed(e, ctx);
            }
        });
    }

    public MetaStore getMetaStore() {
        return this.store;
    }

    public ManagedLedgerFactoryConfig getConfig() {
        return this.config;
    }

    public EntryCacheManager getEntryCacheManager() {
        return this.entryCacheManager;
    }

    public ManagedLedgerFactoryMXBean getCacheStats() {
        return this.mbean;
    }

    public BookKeeper getBookKeeper() {
        return this.bookKeeper;
    }
}

