package org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.store;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.coder.ByteArrayCoder;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.exceptions.ObjectClosedException;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.SharedResourceManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.StateStores;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.StorageResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/shade/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.class */
public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
    private static final Logger log = LoggerFactory.getLogger(MVCCStoreFactoryImpl.class);
    private final Supplier<MVCCAsyncStore<byte[], byte[]>> storeSupplier;
    private final StorageResources storageResources;
    private final OrderedScheduler writeIOScheduler;
    private final OrderedScheduler readIOScheduler;
    private final OrderedScheduler checkpointScheduler;
    private final File[] localStateDirs;
    private final Supplier<CheckpointStore> checkpointStoreSupplier;
    private CheckpointStore checkpointStore;
    private final boolean serveReadOnlyTable;
    private boolean closed = false;
    private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores = Maps.newHashMap();

    public MVCCStoreFactoryImpl(Supplier<Namespace> supplier, Supplier<CheckpointStore> supplier2, File[] fileArr, StorageResources storageResources, boolean z) {
        this.storeSupplier = StateStores.mvccKvBytesStoreSupplier(supplier);
        this.storageResources = storageResources;
        this.writeIOScheduler = (OrderedScheduler) SharedResourceManager.shared().get(storageResources.ioWriteScheduler());
        this.readIOScheduler = (OrderedScheduler) SharedResourceManager.shared().get(storageResources.ioReadScheduler());
        this.checkpointScheduler = (OrderedScheduler) SharedResourceManager.shared().get(storageResources.checkpointScheduler());
        this.localStateDirs = fileArr;
        this.checkpointStoreSupplier = supplier2;
        this.serveReadOnlyTable = z;
    }

    private ScheduledExecutorService chooseWriteIOExecutor(long j) {
        return this.writeIOScheduler.chooseThread(j);
    }

    private ScheduledExecutorService chooseReadIOExecutor(long j) {
        return this.readIOScheduler.chooseThread(j);
    }

    private ScheduledExecutorService chooseCheckpointIOExecutor(long j) {
        return this.checkpointScheduler.chooseThread(j);
    }

    private File chooseLocalStoreDir(long j) {
        return this.localStateDirs[(int) (j % this.localStateDirs.length)];
    }

    static String normalizedName(long j) {
        return String.format("%018d", Long.valueOf(j));
    }

    static String streamName(long j, long j2, long j3) {
        return String.format("%s_%018d_%018d_%018d", "streams", Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3));
    }

    private synchronized void addStore(long j, long j2, long j3, MVCCAsyncStore<byte[], byte[]> mVCCAsyncStore) {
        Map<RangeId, MVCCAsyncStore<byte[], byte[]>> map = this.stores.get(Long.valueOf(j));
        if (null == map) {
            map = Maps.newHashMap();
        }
        RangeId of = RangeId.of(j2, j3);
        if (null != map.get(of)) {
            mVCCAsyncStore.closeAsync();
        } else {
            log.info("Add store (scId = {}, streamId = {}, rangeId = {}) at storage container ({})", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j)});
            map.put(of, mVCCAsyncStore);
        }
    }

    private synchronized MVCCAsyncStore<byte[], byte[]> getStore(long j, long j2, long j3) {
        Map<RangeId, MVCCAsyncStore<byte[], byte[]>> map = this.stores.get(Long.valueOf(j));
        if (null == map) {
            return null;
        }
        return map.get(RangeId.of(j2, j3));
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory
    public CompletableFuture<MVCCAsyncStore<byte[], byte[]>> openStore(long j, long j2, long j3) {
        MVCCAsyncStore<byte[], byte[]> store = getStore(j, j2, j3);
        return null == store ? newStore(j, j2, j3) : FutureUtils.value(store);
    }

    CompletableFuture<MVCCAsyncStore<byte[], byte[]>> newStore(long j, long j2, long j3) {
        synchronized (this) {
            if (this.closed) {
                return FutureUtils.exception(new ObjectClosedException("MVCCStoreFactory"));
            }
            log.info("Initializing stream({})/range({}) at storage container ({})", new Object[]{Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j)});
            MVCCAsyncStore<byte[], byte[]> mVCCAsyncStore = this.storeSupplier.get();
            Path path = Paths.get(chooseLocalStoreDir(j2).getAbsolutePath(), "ranges", normalizedName(j), normalizedName(j2), normalizedName(j3));
            String format = String.format("%s/%s/%s", normalizedName(j), normalizedName(j2), normalizedName(j3));
            if (null == this.checkpointStore) {
                this.checkpointStore = this.checkpointStoreSupplier.get();
            }
            return mVCCAsyncStore.init(StateStoreSpec.builder().name(format).keyCoder(ByteArrayCoder.of()).valCoder(ByteArrayCoder.of()).localStateStoreDir(path.toFile()).stream(streamName(j, j2, j3)).writeIOScheduler(chooseWriteIOExecutor(j2)).readIOScheduler(chooseReadIOExecutor(j2)).checkpointStore(this.checkpointStore).checkpointDuration(Duration.ofMinutes(15L)).checkpointIOScheduler(chooseCheckpointIOExecutor(j2)).isReadonly(this.serveReadOnlyTable).build()).thenApply(r17 -> {
                log.info("Successfully initialize stream({})/range({}) at storage container ({})", new Object[]{Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j)});
                addStore(j, j2, j3, mVCCAsyncStore);
                return mVCCAsyncStore;
            });
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory
    public CompletableFuture<Void> closeStores(long j) {
        Map<RangeId, MVCCAsyncStore<byte[], byte[]>> remove;
        synchronized (this) {
            remove = this.stores.remove(Long.valueOf(j));
        }
        if (null == remove) {
            return FutureUtils.Void();
        }
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<MVCCAsyncStore<byte[], byte[]>> it = remove.values().iterator();
        while (it.hasNext()) {
            newArrayList.add(it.next().closeAsync());
        }
        return FutureUtils.collect(newArrayList).thenApply(list -> {
            return null;
        });
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            HashMap newHashMap = Maps.newHashMap(this.stores);
            this.closed = true;
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it = newHashMap.values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((Map) it.next()).values().iterator();
                while (it2.hasNext()) {
                    newArrayList.add(((MVCCAsyncStore) it2.next()).closeAsync());
                }
            }
            try {
                FutureUtils.result(FutureUtils.collect(newArrayList));
                log.info("Successfully closed all the range stores opened by this range factory");
            } catch (Exception e) {
                log.info("Encountered issue on closing all the range stores opened by this range factory");
            }
            if (null != this.checkpointStore) {
                this.checkpointStore.close();
                this.checkpointStore = null;
            }
            SharedResourceManager.shared().release(this.storageResources.ioWriteScheduler(), this.writeIOScheduler);
            SharedResourceManager.shared().release(this.storageResources.ioReadScheduler(), this.readIOScheduler);
            SharedResourceManager.shared().release(this.storageResources.checkpointScheduler(), this.checkpointScheduler);
        }
    }

    OrderedScheduler writeIOScheduler() {
        return this.writeIOScheduler;
    }

    OrderedScheduler readIOScheduler() {
        return this.readIOScheduler;
    }

    OrderedScheduler checkpointScheduler() {
        return this.checkpointScheduler;
    }
}
