/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.packages.management.storage.bookkeeper;

import com.google.common.base.Strings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
import org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageConfiguration;
import org.apache.pulsar.packages.management.storage.bookkeeper.DLInputStream;
import org.apache.pulsar.packages.management.storage.bookkeeper.DLOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeperPackagesStorage
implements PackagesStorage {
    private static final Logger log = LoggerFactory.getLogger(BookKeeperPackagesStorage.class);
    private static final String NS_CLIENT_ID = "packages-management";
    public static final String ZK_SCHEME_IDENTIFIER = "zk:";
    final BookKeeperPackagesStorageConfiguration configuration;
    private Namespace namespace;

    BookKeeperPackagesStorage(PackagesStorageConfiguration configuration) {
        this.configuration = new BookKeeperPackagesStorageConfiguration(configuration);
    }

    public void initialize() {
        DistributedLogConfiguration conf = new DistributedLogConfiguration().setImmediateFlushEnabled(true).setOutputBufferSize(0).setWriteQuorumSize(this.configuration.getPackagesReplicas()).setEnsembleSize(this.configuration.getPackagesReplicas()).setAckQuorumSize(this.configuration.getPackagesReplicas()).setLockTimeout(0L);
        if (!Strings.isNullOrEmpty((String)this.configuration.getBookkeeperClientAuthenticationPlugin())) {
            conf.setProperty("bkc.clientAuthProviderFactoryClass", (Object)this.configuration.getBookkeeperClientAuthenticationPlugin());
            if (!Strings.isNullOrEmpty((String)this.configuration.getBookkeeperClientAuthenticationParametersName())) {
                conf.setProperty("bkc." + this.configuration.getBookkeeperClientAuthenticationParametersName(), (Object)this.configuration.getBookkeeperClientAuthenticationParameters());
            }
        }
        try {
            this.namespace = NamespaceBuilder.newBuilder().conf(conf).clientId(NS_CLIENT_ID).uri(this.initializeDlogNamespace()).build();
        }
        catch (IOException e) {
            throw new RuntimeException("Initialize distributed log for packages management service failed.", e);
        }
        log.info("Packages management bookKeeper storage initialized successfully");
    }

    private URI initializeDlogNamespace() throws IOException {
        URI dlogURI;
        block7: {
            String ledgersRootPath;
            String ledgersStoreServers;
            String bookkeeperMetadataServiceUri = this.configuration.getProperty("bookkeeperMetadataServiceUri");
            if (StringUtils.isNotBlank((CharSequence)bookkeeperMetadataServiceUri)) {
                URI metadataServiceUri = URI.create(bookkeeperMetadataServiceUri);
                ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ",");
                ledgersRootPath = metadataServiceUri.getPath();
            } else {
                ledgersRootPath = this.configuration.getPackagesManagementLedgerRootPath();
                if (StringUtils.isNotBlank((CharSequence)this.configuration.getMetadataStoreUrl())) {
                    ledgersStoreServers = this.configuration.getMetadataStoreUrl();
                    if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
                        ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
                    }
                } else {
                    ledgersStoreServers = this.configuration.getZookeeperServers();
                }
            }
            BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
            DLMetadata dlMetadata = DLMetadata.create((BKDLConfig)bkdlConfig);
            dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", ledgersStoreServers));
            try {
                dlMetadata.create(dlogURI);
            }
            catch (ZKException e) {
                if (e.getKeeperExceptionCode() != KeeperException.Code.NODEEXISTS) break block7;
                return dlogURI;
            }
        }
        return dlogURI;
    }

    private CompletableFuture<DistributedLogManager> openLogManagerAsync(String path) {
        CompletableFuture<DistributedLogManager> logFuture = new CompletableFuture<DistributedLogManager>();
        CompletableFuture.runAsync(() -> {
            try {
                logFuture.complete(this.namespace.openLog(path));
            }
            catch (IOException e) {
                logFuture.completeExceptionally(e);
            }
        });
        return logFuture;
    }

    public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) {
        return ((CompletableFuture)((CompletableFuture)this.openLogManagerAsync(path).thenCompose(DLOutputStream::openWriterAsync)).thenCompose(dlOutputStream -> dlOutputStream.writeAsync(inputStream))).thenCompose(DLOutputStream::closeAsync);
    }

    public CompletableFuture<Void> readAsync(String path, OutputStream outputStream) {
        return ((CompletableFuture)((CompletableFuture)this.openLogManagerAsync(path).thenCompose(DLInputStream::openReaderAsync)).thenCompose(dlInputStream -> dlInputStream.readAsync(outputStream))).thenCompose(DLInputStream::closeAsync);
    }

    public CompletableFuture<Void> deleteAsync(String path) {
        return this.namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(path).thenCompose(uri -> uri.map(value -> this.namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).deleteLog(value, path)).orElse(null));
    }

    public CompletableFuture<List<String>> listAsync(String path) {
        return this.namespace.getNamespaceDriver().getLogMetadataStore().getLogs(path).thenApply(logs -> {
            ArrayList packages = new ArrayList();
            logs.forEachRemaining(packages::add);
            return packages;
        });
    }

    public CompletableFuture<Boolean> existAsync(String path) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(path).whenComplete((uriOptional, throwable) -> {
            if (throwable != null) {
                result.complete(false);
                return;
            }
            if (uriOptional.isPresent()) {
                this.namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).logExists((URI)uriOptional.get(), path).whenComplete((ignore, e) -> {
                    if (e != null) {
                        result.complete(false);
                    } else {
                        result.complete(true);
                    }
                });
            } else {
                result.complete(false);
            }
        });
        return result;
    }

    public CompletableFuture<Void> closeAsync() {
        return CompletableFuture.runAsync(() -> this.namespace.close());
    }
}

