/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final WorkerConfig workerConfig;
    private final WorkerService workerService;
    private WorkerServer server;
    private ZooKeeperClientFactory zkClientFactory = null;
    private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
    private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("zk-cache-callback"));
    private GlobalZooKeeperCache globalZkCache;
    private ConfigurationCacheService configurationCacheService;
    private final ErrorNotifier errorNotifier;

    public Worker(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerService = new WorkerService(workerConfig, true);
        this.errorNotifier = ErrorNotifier.getDefaultImpl();
    }

    protected void start() throws Exception {
        URI dlogUri = Worker.initialize(this.workerConfig);
        this.workerService.start(dlogUri, this.getAuthenticationService(), this.getAuthorizationService(), this.errorNotifier);
        this.server = new WorkerServer(this.workerService);
        this.server.start();
        log.info("/** Started worker server on port={} **/", (Object)this.workerConfig.getWorkerPort());
        try {
            this.errorNotifier.waitForError();
        }
        catch (Throwable th) {
            log.error("!-- Fatal error encountered. Worker will exit now. --!", th);
            throw th;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static URI initialize(WorkerConfig workerConfig) throws InterruptedException, PulsarAdminException, IOException {
        InternalConfigurationData internalConf;
        PulsarAdmin admin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), workerConfig.getBrokerClientAuthenticationPlugin(), workerConfig.getBrokerClientAuthenticationParameters(), workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(), workerConfig.isTlsEnableHostnameVerification());
        log.info("Checking if pulsar service at {} is up...", (Object)workerConfig.getPulsarWebServiceUrl());
        int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
        int retries = 0;
        while (true) {
            try {
                admin.clusters().getClusters();
            }
            catch (PulsarAdminException e) {
                log.warn("Failed to retrieve clusters from pulsar service", (Throwable)e);
                log.warn("Retry to connect to Pulsar service at {}", (Object)workerConfig.getPulsarWebServiceUrl());
                if (retries >= maxRetries) {
                    log.error("Failed to connect to Pulsar service at {} after {} attempts", (Object)workerConfig.getPulsarFunctionsNamespace(), (Object)maxRetries);
                    throw e;
                }
                ++retries;
                Thread.sleep(1000L);
                continue;
            }
            break;
        }
        log.info("Initializing Pulsar Functions namespace...");
        try {
            block17: {
                try {
                    admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
                }
                catch (PulsarAdminException e) {
                    if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                        try {
                            Policies policies = new Policies();
                            policies.retention_policies = new RetentionPolicies(-1, -1);
                            policies.replication_clusters = new HashSet<String>();
                            policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
                            admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), policies);
                        }
                        catch (PulsarAdminException e1) {
                            if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) {
                                log.error("Failed to create namespace {} for pulsar functions", (Object)workerConfig.getPulsarFunctionsNamespace(), (Object)e1);
                                throw e1;
                            }
                            break block17;
                        }
                    }
                    log.error("Failed to get retention policy for pulsar function namespace {}", (Object)workerConfig.getPulsarFunctionsNamespace(), (Object)e);
                    throw e;
                }
            }
            try {
                internalConf = admin.brokers().getInternalConfigurationData();
            }
            catch (PulsarAdminException e) {
                log.error("Failed to retrieve broker internal configuration", (Throwable)e);
                throw e;
            }
        }
        finally {
            admin.close();
        }
        try {
            return WorkerUtils.initializeDlogNamespace(internalConf);
        }
        catch (IOException ioe) {
            log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages", new Object[]{internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe});
            throw ioe;
        }
    }

    private AuthorizationService getAuthorizationService() throws PulsarServerException {
        if (this.workerConfig.isAuthorizationEnabled()) {
            log.info("starting configuration cache service");
            this.globalZkCache = new GlobalZooKeeperCache(this.getZooKeeperClientFactory(), (int)this.workerConfig.getZooKeeperSessionTimeoutMillis(), this.workerConfig.getZooKeeperOperationTimeoutSeconds(), this.workerConfig.getConfigurationStoreServers(), this.orderedExecutor, this.cacheExecutor, this.workerConfig.getZooKeeperOperationTimeoutSeconds());
            try {
                this.globalZkCache.start();
            }
            catch (IOException e) {
                throw new PulsarServerException(e);
            }
            this.configurationCacheService = new ConfigurationCacheService(this.globalZkCache, this.workerConfig.getPulsarFunctionsCluster());
            return new AuthorizationService(this.getServiceConfiguration(), this.configurationCacheService);
        }
        return null;
    }

    private AuthenticationService getAuthenticationService() throws PulsarServerException {
        return new AuthenticationService(this.getServiceConfiguration());
    }

    public ZooKeeperClientFactory getZooKeeperClientFactory() {
        if (this.zkClientFactory == null) {
            this.zkClientFactory = new ZookeeperBkClientFactoryImpl(this.orderedExecutor);
        }
        return this.zkClientFactory;
    }

    protected void stop() {
        try {
            if (null != this.server) {
                this.server.stop();
            }
            this.workerService.stop();
        }
        catch (Exception e) {
            log.warn("Failed to gracefully stop worker service ", (Throwable)e);
        }
        if (this.globalZkCache != null) {
            try {
                this.globalZkCache.close();
            }
            catch (IOException e) {
                log.warn("Failed to close global zk cache ", (Throwable)e);
            }
        }
    }

    public Optional<Integer> getListenPortHTTP() {
        return this.server.getListenPortHTTP();
    }

    public Optional<Integer> getListenPortHTTPS() {
        return this.server.getListenPortHTTPS();
    }

    private ServiceConfiguration getServiceConfiguration() {
        ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(this.workerConfig);
        serviceConfiguration.setClusterName(this.workerConfig.getPulsarFunctionsCluster());
        return serviceConfiguration;
    }
}

