package org.apache.distributedlog.impl;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.feature.FeatureProvider;
import dlshade.org.apache.bookkeeper.net.NodeBase;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import dlshade.org.apache.bookkeeper.util.CertUtils;
import dlshade.org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import dlshade.org.apache.commons.lang.SystemUtils;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.ZooKeeper;
import dlshade.org.apache.zookeeper.common.PathUtils;
import dlshade.org.apache.zookeeper.data.Stat;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.acl.DefaultAccessControlManager;
import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.bk.LedgerAllocatorUtils;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.impl.acl.ZKAccessControlManager;
import org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
import org.apache.distributedlog.impl.subscription.ZKSubscriptionsStore;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataStore;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.namespace.NamespaceDriverManager;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/impl/BKNamespaceDriver.class */
public class BKNamespaceDriver implements NamespaceDriver {
    private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class);
    private DistributedLogConfiguration conf;
    private DynamicDistributedLogConfiguration dynConf;
    private URI namespace;
    private OrderedScheduler scheduler;
    private FeatureProvider featureProvider;
    private AsyncFailureInjector failureInjector;
    private StatsLogger statsLogger;
    private StatsLogger perLogStatsLogger;
    private String clientId;
    private int regionId;
    private BKDLConfig bkdlConfig;
    private ZooKeeperClientBuilder sharedWriterZKCBuilder;
    private ZooKeeperClient writerZKC;
    private ZooKeeperClientBuilder sharedReaderZKCBuilder;
    private ZooKeeperClient readerZKC;
    private EventLoopGroup eventLoopGroup;
    private HashedWheelTimer requestTimer;
    private BookKeeperClientBuilder sharedWriterBKCBuilder;
    private BookKeeperClient writerBKC;
    private BookKeeperClientBuilder sharedReaderBKCBuilder;
    private BookKeeperClient readerBKC;
    private LogMetadataStore metadataStore;
    private LogStreamMetadataStore writerStreamMetadataStore;
    private LogStreamMetadataStore readerStreamMetadataStore;
    private LedgerAllocator allocator;
    private LogSegmentEntryStore writerEntryStore;
    private LogSegmentEntryStore readerEntryStore;
    private AccessControlManager accessControlManager;
    protected boolean initialized = false;
    protected AtomicBoolean closed = new AtomicBoolean(false);

    public static String getZKServersFromDLUri(URI uri) {
        return uri.getAuthority().replace(CertUtils.OU_VALUES_SEPARATOR, ",");
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public synchronized NamespaceDriver initialize(DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, URI uri, OrderedScheduler orderedScheduler, FeatureProvider featureProvider, AsyncFailureInjector asyncFailureInjector, StatsLogger statsLogger, StatsLogger statsLogger2, String str, int i) throws IOException {
        if (this.initialized) {
            return this;
        }
        if (null == uri || null == uri.getAuthority() || null == uri.getPath()) {
            throw new IOException("Incorrect distributedlog namespace : " + uri);
        }
        this.conf = distributedLogConfiguration;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.namespace = uri;
        this.scheduler = orderedScheduler;
        this.featureProvider = featureProvider;
        this.failureInjector = asyncFailureInjector;
        this.statsLogger = statsLogger;
        this.perLogStatsLogger = statsLogger2;
        this.clientId = str;
        this.regionId = i;
        initializeZooKeeperClients();
        initializeBookKeeperClients();
        BKDLConfig.propagateConfiguration(this.bkdlConfig, distributedLogConfiguration);
        initializeLogStreamMetadataStores();
        initializeOtherResources();
        this.initialized = true;
        LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.", new Object[]{str, Integer.valueOf(i), Boolean.valueOf(this.bkdlConfig.isFederatedNamespace())});
        return this;
    }

    private void initializeZooKeeperClients() throws IOException {
        this.sharedWriterZKCBuilder = createZKClientBuilder(String.format("dlzk:%s:factory_writer_shared", this.namespace), this.conf, getZKServersFromDLUri(this.namespace), this.statsLogger.scope("dlzk_factory_writer_shared"));
        this.writerZKC = this.sharedWriterZKCBuilder.build();
        this.bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, this.namespace);
        if (this.bkdlConfig.getDlZkServersForWriter().equals(this.bkdlConfig.getDlZkServersForReader())) {
            this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder;
        } else {
            this.sharedReaderZKCBuilder = createZKClientBuilder(String.format("dlzk:%s:factory_reader_shared", this.namespace), this.conf, this.bkdlConfig.getDlZkServersForReader(), this.statsLogger.scope("dlzk_factory_reader_shared"));
        }
        this.readerZKC = this.sharedReaderZKCBuilder.build();
    }

    private synchronized BKDLConfig getBkdlConfig() {
        return this.bkdlConfig;
    }

    static EventLoopGroup getDefaultEventLoopGroup(int i) {
        ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("DL-io-%s").build();
        if (!SystemUtils.IS_OS_LINUX) {
            return new NioEventLoopGroup(i, build);
        }
        try {
            return new EpollEventLoopGroup(i, build);
        } catch (Throwable th) {
            LOG.warn("Could not use Netty Epoll event loop for bookie server:", th);
            return new NioEventLoopGroup(i, build);
        }
    }

    private void initializeBookKeeperClients() throws IOException {
        this.eventLoopGroup = getDefaultEventLoopGroup(this.conf.getBKClientNumberIOThreads());
        this.requestTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(), this.conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, this.conf.getTimeoutTimerNumTicks());
        this.sharedWriterBKCBuilder = createBKCBuilder(String.format("bk:%s:factory_writer_shared", this.namespace), this.conf, this.bkdlConfig.getBkZkServersForWriter(), this.bkdlConfig.getBkLedgersPath(), this.eventLoopGroup, this.requestTimer, Optional.of(this.featureProvider.scope("bkc")), this.statsLogger);
        this.writerBKC = this.sharedWriterBKCBuilder.build();
        if (this.bkdlConfig.getBkZkServersForWriter().equals(this.bkdlConfig.getBkZkServersForReader())) {
            this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
        } else {
            this.sharedReaderBKCBuilder = createBKCBuilder(String.format("bk:%s:factory_reader_shared", this.namespace), this.conf, this.bkdlConfig.getBkZkServersForReader(), this.bkdlConfig.getBkLedgersPath(), this.eventLoopGroup, this.requestTimer, Optional.empty(), this.statsLogger);
        }
        this.readerBKC = this.sharedReaderBKCBuilder.build();
    }

    private void initializeLogStreamMetadataStores() throws IOException {
        if (this.bkdlConfig.isFederatedNamespace() || this.conf.isFederatedNamespaceEnabled()) {
            this.metadataStore = new FederatedZKLogMetadataStore(this.conf, this.namespace, this.readerZKC, this.scheduler);
        } else {
            this.metadataStore = new ZKLogMetadataStore(this.conf, this.namespace, this.readerZKC, this.scheduler);
        }
        this.writerStreamMetadataStore = new ZKLogStreamMetadataStore(this.clientId, this.conf, this.writerZKC, this.scheduler, this.statsLogger);
        this.readerStreamMetadataStore = new ZKLogStreamMetadataStore(this.clientId, this.conf, this.readerZKC, this.scheduler, this.statsLogger);
    }

    @VisibleForTesting
    public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration distributedLogConfiguration, URI uri) throws IOException {
        String ledgerAllocatorPoolPath = distributedLogConfiguration.getLedgerAllocatorPoolPath();
        LOG.info("PoolPath is {}", ledgerAllocatorPoolPath);
        if (null == ledgerAllocatorPoolPath || !ledgerAllocatorPoolPath.startsWith(".") || ledgerAllocatorPoolPath.endsWith(NodeBase.PATH_SEPARATOR_STR)) {
            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool: {}", ledgerAllocatorPoolPath);
            throw new IOException("Invalid ledger allocator pool path specified : " + ledgerAllocatorPoolPath);
        }
        String ledgerAllocatorPoolName = distributedLogConfiguration.getLedgerAllocatorPoolName();
        if (null == ledgerAllocatorPoolName) {
            LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool.");
            throw new IOException("No ledger allocator name specified when enabling ledger allocator pool.");
        }
        String str = uri.getPath() + NodeBase.PATH_SEPARATOR_STR + ledgerAllocatorPoolPath + NodeBase.PATH_SEPARATOR_STR + ledgerAllocatorPoolName;
        try {
            PathUtils.validatePath(str);
            return str;
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool: {}", ledgerAllocatorPoolPath);
            throw new IOException("Invalid ledger allocator pool path specified : " + ledgerAllocatorPoolPath);
        }
    }

    private void initializeOtherResources() throws IOException {
        if (!this.conf.getEnableLedgerAllocatorPool()) {
            this.allocator = null;
            return;
        }
        String validateAndGetFullLedgerAllocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(this.conf, this.namespace);
        this.allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(validateAndGetFullLedgerAllocatorPoolPath, this.conf.getLedgerAllocatorPoolCoreSize(), this.conf, this.writerZKC, this.writerBKC, this.scheduler);
        if (null != this.allocator) {
            this.allocator.start();
        }
        LOG.info("Created ledger allocator pool under {} with size {}.", validateAndGetFullLedgerAllocatorPoolPath, Integer.valueOf(this.conf.getLedgerAllocatorPoolCoreSize()));
    }

    private void checkState() throws IOException {
        if (this.closed.get()) {
            LOG.error("BK namespace driver {} is already closed", this.namespace);
            throw new AlreadyClosedException("BK namespace driver " + this.namespace + " is already closed");
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            doClose();
        }
    }

    private void doClose() {
        if (null != this.accessControlManager) {
            this.accessControlManager.close();
            LOG.info("Access Control Manager Stopped.");
        }
        if (null != this.allocator) {
            Utils.closeQuietly(this.allocator);
            LOG.info("Ledger Allocator stopped.");
        }
        Utils.close(this.writerStreamMetadataStore);
        Utils.close(this.readerStreamMetadataStore);
        this.writerBKC.close();
        this.readerBKC.close();
        this.writerZKC.close();
        this.readerZKC.close();
        this.eventLoopGroup.shutdownGracefully();
        LOG.info("Release external resources used by channel factory.");
        this.requestTimer.stop();
        LOG.info("Stopped request timer");
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public URI getUri() {
        return this.namespace;
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public String getScheme() {
        return "bk";
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public LogMetadataStore getLogMetadataStore() {
        return this.metadataStore;
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public LogStreamMetadataStore getLogStreamMetadataStore(NamespaceDriver.Role role) {
        return NamespaceDriver.Role.WRITER == role ? this.writerStreamMetadataStore : this.readerStreamMetadataStore;
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public LogSegmentEntryStore getLogSegmentEntryStore(NamespaceDriver.Role role) {
        return NamespaceDriver.Role.WRITER == role ? getWriterEntryStore() : getReaderEntryStore();
    }

    private LogSegmentEntryStore getWriterEntryStore() {
        if (null == this.writerEntryStore) {
            this.writerEntryStore = new BKLogSegmentEntryStore(this.conf, this.dynConf, this.writerZKC, this.writerBKC, this.scheduler, this.allocator, this.statsLogger, this.failureInjector);
        }
        return this.writerEntryStore;
    }

    private LogSegmentEntryStore getReaderEntryStore() {
        if (null == this.readerEntryStore) {
            this.readerEntryStore = new BKLogSegmentEntryStore(this.conf, this.dynConf, this.writerZKC, this.readerBKC, this.scheduler, this.allocator, this.statsLogger, this.failureInjector);
        }
        return this.readerEntryStore;
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public AccessControlManager getAccessControlManager() throws IOException {
        if (null == this.accessControlManager) {
            String aCLRootPath = getBkdlConfig().getACLRootPath();
            if (aCLRootPath == null) {
                this.accessControlManager = DefaultAccessControlManager.INSTANCE;
                LOG.info("Created default access control manager for {}", this.namespace);
            } else {
                if (!DLUtils.isReservedStreamName(aCLRootPath)) {
                    throw new IOException("Invalid Access Control List Root Path : " + aCLRootPath);
                }
                String str = this.namespace.getPath() + NodeBase.PATH_SEPARATOR_STR + aCLRootPath;
                LOG.info("Creating zk based access control manager @ {} for {}", str, this.namespace);
                this.accessControlManager = new ZKAccessControlManager(this.conf, this.readerZKC, str, this.scheduler);
                LOG.info("Created zk based access control manager @ {} for {}", str, this.namespace);
            }
        }
        return this.accessControlManager;
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public SubscriptionsStore getSubscriptionsStore(String str) {
        return new ZKSubscriptionsStore(this.writerZKC, LogMetadataForReader.getSubscribersPath(this.namespace, str, this.conf.getUnpartitionedStreamName()));
    }

    @Override // org.apache.distributedlog.namespace.NamespaceDriver
    public MetadataAccessor getMetadataAccessor(String str) throws InvalidStreamNameException, IOException {
        if (getBkdlConfig().isFederatedNamespace()) {
            throw new UnsupportedOperationException();
        }
        checkState();
        return new ZKMetadataAccessor(DLUtils.validateAndNormalizeName(str), this.conf, this.namespace, this.sharedWriterZKCBuilder, this.sharedReaderZKCBuilder, this.statsLogger);
    }

    public Map<String, byte[]> enumerateLogsWithMetadataInNamespace() throws IOException, IllegalArgumentException {
        String path = this.namespace.getPath();
        HashMap hashMap = new HashMap();
        try {
            ZooKeeper sync = Utils.sync(this.writerZKC, path);
            if (sync.exists(path, false) == null) {
                return hashMap;
            }
            for (String str : sync.getChildren(path, false)) {
                if (!DLUtils.isReservedStreamName(str)) {
                    String format = String.format("%s/%s", path, str);
                    Stat exists = sync.exists(format, false);
                    if (exists == null) {
                        hashMap.put(str, new byte[0]);
                    } else {
                        hashMap.put(str, sync.getData(format, false, exists));
                    }
                }
            }
            return hashMap;
        } catch (KeeperException e) {
            LOG.error("Error reading" + path + "entry in zookeeper", e);
            throw new IOException("Error reading" + path + "entry in zookeeper", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while deleting " + path, e2);
            throw new IOException("Interrupted while reading " + path, e2);
        }
    }

    public static ZooKeeperClientBuilder createZKClientBuilder(String str, DistributedLogConfiguration distributedLogConfiguration, String str2, StatsLogger statsLogger) {
        BoundExponentialBackoffRetryPolicy boundExponentialBackoffRetryPolicy = null;
        if (distributedLogConfiguration.getZKNumRetries() > 0) {
            boundExponentialBackoffRetryPolicy = new BoundExponentialBackoffRetryPolicy(distributedLogConfiguration.getZKRetryBackoffStartMillis(), distributedLogConfiguration.getZKRetryBackoffMaxMillis(), distributedLogConfiguration.getZKNumRetries());
        }
        ZooKeeperClientBuilder zkAclId = ZooKeeperClientBuilder.newBuilder().name(str).sessionTimeoutMs(distributedLogConfiguration.getZKSessionTimeoutMilliseconds()).retryThreadCount(distributedLogConfiguration.getZKClientNumberRetryThreads()).requestRateLimit(distributedLogConfiguration.getZKRequestRateLimit()).zkServers(str2).retryPolicy(boundExponentialBackoffRetryPolicy).statsLogger(statsLogger).zkAclId(distributedLogConfiguration.getZkAclId());
        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}, maxRetryBackoff = {}, zkAclId = {}.", new Object[]{str, str2, Integer.valueOf(distributedLogConfiguration.getZKNumRetries()), Integer.valueOf(distributedLogConfiguration.getZKSessionTimeoutMilliseconds()), Integer.valueOf(distributedLogConfiguration.getZKRetryBackoffStartMillis()), Integer.valueOf(distributedLogConfiguration.getZKRetryBackoffMaxMillis()), distributedLogConfiguration.getZkAclId()});
        return zkAclId;
    }

    private BookKeeperClientBuilder createBKCBuilder(String str, DistributedLogConfiguration distributedLogConfiguration, String str2, String str3, EventLoopGroup eventLoopGroup, HashedWheelTimer hashedWheelTimer, Optional<FeatureProvider> optional, StatsLogger statsLogger) {
        BookKeeperClientBuilder statsLogger2 = BookKeeperClientBuilder.newBuilder().name(str).dlConfig(distributedLogConfiguration).zkServers(str2).ledgersPath(str3).eventLoopGroup(eventLoopGroup).requestTimer(hashedWheelTimer).featureProvider(optional).statsLogger(statsLogger);
        LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}", new Object[]{str, str2, str3, Integer.valueOf(distributedLogConfiguration.getBKClientNumberIOThreads())});
        return statsLogger2;
    }

    @VisibleForTesting
    public ZooKeeperClient getWriterZKC() {
        return this.writerZKC;
    }

    @VisibleForTesting
    public BookKeeperClient getReaderBKC() {
        return this.readerBKC;
    }

    @VisibleForTesting
    public AsyncFailureInjector getFailureInjector() {
        return this.failureInjector;
    }

    @VisibleForTesting
    public LogStreamMetadataStore getWriterStreamMetadataStore() {
        return this.writerStreamMetadataStore;
    }

    @VisibleForTesting
    public LedgerAllocator getLedgerAllocator() {
        return this.allocator;
    }

    static {
        NamespaceDriverManager.registerDriver("bk", BKNamespaceDriver.class);
    }
}
