/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final WorkerConfig workerConfig;
    private final WorkerService workerService;
    private WorkerServer server;
    private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
    private PulsarResources pulsarResources;
    private MetadataStoreExtended configMetadataStore;
    private final ErrorNotifier errorNotifier;

    public Worker(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerService = WorkerServiceLoader.load(workerConfig);
        this.errorNotifier = ErrorNotifier.getDefaultImpl();
    }

    protected void start() throws Exception {
        this.workerService.initAsStandalone(this.workerConfig);
        this.workerService.start(this.getAuthenticationService(), this.getAuthorizationService(), this.errorNotifier);
        this.server = new WorkerServer(this.workerService, this.getAuthenticationService());
        this.server.start();
        log.info("/** Started worker server on port={} **/", (Object)this.workerConfig.getWorkerPort());
        try {
            this.errorNotifier.waitForError();
        }
        catch (Throwable th) {
            log.error("!-- Fatal error encountered. Worker will exit now. --!", th);
            throw th;
        }
    }

    private AuthorizationService getAuthorizationService() throws PulsarServerException {
        if (this.workerConfig.isAuthorizationEnabled()) {
            log.info("starting configuration cache service");
            try {
                this.configMetadataStore = PulsarResources.createMetadataStore(this.workerConfig.getConfigurationMetadataStoreUrl(), (int)this.workerConfig.getMetadataStoreSessionTimeoutMillis(), this.workerConfig.isZooKeeperAllowReadOnlyOperations());
            }
            catch (IOException e) {
                throw new PulsarServerException(e);
            }
            this.pulsarResources = new PulsarResources(null, this.configMetadataStore);
            return new AuthorizationService(this.getServiceConfiguration(), this.pulsarResources);
        }
        return null;
    }

    private AuthenticationService getAuthenticationService() throws PulsarServerException {
        return new AuthenticationService(this.getServiceConfiguration());
    }

    protected void stop() {
        try {
            if (null != this.server) {
                this.server.stop();
            }
            this.workerService.stop();
        }
        catch (Exception e) {
            log.warn("Failed to gracefully stop worker service ", (Throwable)e);
        }
        if (this.configMetadataStore != null) {
            try {
                this.configMetadataStore.close();
            }
            catch (Exception e) {
                log.warn("Failed to close global zk cache ", (Throwable)e);
            }
        }
        if (this.orderedExecutor != null) {
            this.orderedExecutor.shutdownNow();
        }
    }

    public Optional<Integer> getListenPortHTTP() {
        return this.server.getListenPortHTTP();
    }

    public Optional<Integer> getListenPortHTTPS() {
        return this.server.getListenPortHTTPS();
    }

    private ServiceConfiguration getServiceConfiguration() {
        ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(this.workerConfig);
        serviceConfiguration.setClusterName(this.workerConfig.getPulsarFunctionsCluster());
        return serviceConfiguration;
    }
}

