package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/Worker.class */
public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final WorkerConfig workerConfig;
    private final WorkerService workerService;
    private WorkerServer server;
    private GlobalZooKeeperCache globalZkCache;
    private ConfigurationCacheService configurationCacheService;
    private ZooKeeperClientFactory zkClientFactory = null;
    private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
    private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("zk-cache-callback"));
    private final ErrorNotifier errorNotifier = ErrorNotifier.getDefaultImpl();

    public Worker(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerService = new WorkerService(workerConfig);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() throws Exception {
        this.workerService.start(initialize(this.workerConfig), getAuthenticationService(), getAuthorizationService(), this.errorNotifier);
        this.server = new WorkerServer(this.workerService);
        this.server.start();
        log.info("/** Started worker server on port={} **/", this.workerConfig.getWorkerPort());
        try {
            this.errorNotifier.waitForError();
        } catch (Throwable th) {
            log.error("!-- Fatal error encountered. Worker will exit now. --!", th);
            throw th;
        }
    }

    private static URI initialize(WorkerConfig workerConfig) throws InterruptedException, PulsarAdminException, IOException {
        PulsarAdmin pulsarAdminClient = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), workerConfig.getTlsTrustCertsFilePath(), Boolean.valueOf(workerConfig.isTlsAllowInsecureConnection()), Boolean.valueOf(workerConfig.isTlsHostnameVerificationEnable()));
        log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl());
        int initialBrokerReconnectMaxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
        int i = 0;
        while (true) {
            try {
                pulsarAdminClient.clusters().getClusters();
                log.info("Initializing Pulsar Functions namespace...");
                try {
                    try {
                        pulsarAdminClient.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
                    } catch (PulsarAdminException e) {
                        if (e.getStatusCode() != Response.Status.NOT_FOUND.getStatusCode()) {
                            log.error("Failed to get retention policy for pulsar function namespace {}", workerConfig.getPulsarFunctionsNamespace(), e);
                            throw e;
                        }
                        try {
                            Policies policies = new Policies();
                            policies.retention_policies = new RetentionPolicies(-1, -1);
                            policies.replication_clusters = new HashSet();
                            policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
                            pulsarAdminClient.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), policies);
                        } catch (PulsarAdminException e2) {
                            if (e2.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) {
                                log.error("Failed to create namespace {} for pulsar functions", workerConfig.getPulsarFunctionsNamespace(), e2);
                                throw e2;
                            }
                        }
                    }
                    try {
                        InternalConfigurationData internalConfigurationData = pulsarAdminClient.brokers().getInternalConfigurationData();
                        try {
                            return WorkerUtils.initializeDlogNamespace(internalConfigurationData);
                        } catch (IOException e3) {
                            log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages", new Object[]{internalConfigurationData.getZookeeperServers(), internalConfigurationData.getBookkeeperMetadataServiceUri(), e3});
                            throw e3;
                        }
                    } catch (PulsarAdminException e4) {
                        log.error("Failed to retrieve broker internal configuration", e4);
                        throw e4;
                    }
                } finally {
                    pulsarAdminClient.close();
                }
            } catch (PulsarAdminException e5) {
                log.warn("Failed to retrieve clusters from pulsar service", e5);
                log.warn("Retry to connect to Pulsar service at {}", workerConfig.getPulsarWebServiceUrl());
                if (i >= initialBrokerReconnectMaxRetries) {
                    log.error("Failed to connect to Pulsar service at {} after {} attempts", workerConfig.getPulsarFunctionsNamespace(), Integer.valueOf(initialBrokerReconnectMaxRetries));
                    throw e5;
                }
                i++;
                Thread.sleep(1000L);
            }
        }
    }

    private AuthorizationService getAuthorizationService() throws PulsarServerException {
        if (!this.workerConfig.isAuthorizationEnabled()) {
            return null;
        }
        log.info("starting configuration cache service");
        this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) this.workerConfig.getZooKeeperSessionTimeoutMillis(), this.workerConfig.getZooKeeperOperationTimeoutSeconds(), this.workerConfig.getConfigurationStoreServers(), this.orderedExecutor, this.cacheExecutor, this.workerConfig.getZooKeeperOperationTimeoutSeconds());
        try {
            this.globalZkCache.start();
            this.configurationCacheService = new ConfigurationCacheService(this.globalZkCache, this.workerConfig.getPulsarFunctionsCluster());
            return new AuthorizationService(PulsarConfigurationLoader.convertFrom(this.workerConfig), this.configurationCacheService);
        } catch (IOException e) {
            throw new PulsarServerException(e);
        }
    }

    private AuthenticationService getAuthenticationService() throws PulsarServerException {
        return new AuthenticationService(PulsarConfigurationLoader.convertFrom(this.workerConfig));
    }

    public ZooKeeperClientFactory getZooKeeperClientFactory() {
        if (this.zkClientFactory == null) {
            this.zkClientFactory = new ZookeeperBkClientFactoryImpl(this.orderedExecutor);
        }
        return this.zkClientFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        try {
            if (null != this.server) {
                this.server.stop();
            }
            this.workerService.stop();
        } catch (Exception e) {
            log.warn("Failed to gracefully stop worker service ", e);
        }
        if (this.globalZkCache != null) {
            try {
                this.globalZkCache.close();
            } catch (IOException e2) {
                log.warn("Failed to close global zk cache ", e2);
            }
        }
    }

    public Optional<Integer> getListenPortHTTP() {
        return this.server.getListenPortHTTP();
    }

    public Optional<Integer> getListenPortHTTPS() {
        return this.server.getListenPortHTTPS();
    }
}
