/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.javax.servlet.ServletException;
import org.apache.pulsar.shade.javax.websocket.DeploymentException;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.websocket.ConsumerHandler;
import org.apache.pulsar.websocket.ProducerHandler;
import org.apache.pulsar.websocket.ReaderHandler;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.stats.ProxyStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketService
implements Closeable {
    AuthenticationService authenticationService;
    AuthorizationService authorizationService;
    PulsarClient pulsarClient;
    private final ScheduledExecutorService executor;
    private PulsarResources pulsarResources;
    private MetadataStoreExtended configMetadataStore;
    private ServiceConfiguration config;
    private ClusterData localCluster;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ConsumerHandler>> topicConsumerMap;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> topicReaderMap;
    private final ProxyStats proxyStats;
    private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);

    public WebSocketService(WebSocketProxyConfiguration config) {
        this(WebSocketService.createClusterData(config), PulsarConfigurationLoader.convertFrom(config));
    }

    public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
        this.config = config;
        this.executor = Executors.newScheduledThreadPool(config.getWebSocketNumServiceThreads(), new DefaultThreadFactory("pulsar-websocket"));
        this.localCluster = localCluster;
        this.topicProducerMap = ConcurrentOpenHashMap.newBuilder().build();
        this.topicConsumerMap = ConcurrentOpenHashMap.newBuilder().build();
        this.topicReaderMap = ConcurrentOpenHashMap.newBuilder().build();
        this.proxyStats = new ProxyStats(this);
    }

    public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException {
        if (StringUtils.isNotBlank(this.config.getConfigurationMetadataStoreUrl())) {
            try {
                this.configMetadataStore = this.createMetadataStore(this.config.getConfigurationMetadataStoreUrl(), (int)this.config.getMetadataStoreSessionTimeoutMillis());
            }
            catch (MetadataStoreException e) {
                throw new PulsarServerException(e);
            }
            this.pulsarResources = new PulsarResources(null, this.configMetadataStore);
        }
        if (this.config.isAuthorizationEnabled()) {
            if (this.pulsarResources == null) {
                throw new PulsarServerException("Failed to initialize authorization manager due to empty ConfigurationStoreServers");
            }
            this.authorizationService = new AuthorizationService(this.config, this.pulsarResources);
        }
        this.authenticationService = new AuthenticationService(this.config);
        log.info("Pulsar WebSocket Service started");
    }

    public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs) throws MetadataStoreException {
        return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs);
    }

    @Override
    public void close() throws IOException {
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        if (this.authenticationService != null) {
            this.authenticationService.close();
        }
        if (this.configMetadataStore != null) {
            try {
                this.configMetadataStore.close();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        this.executor.shutdown();
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    public synchronized PulsarClient getPulsarClient() throws IOException {
        if (this.pulsarClient == null) {
            if (this.localCluster == null) {
                this.localCluster = this.retrieveClusterData();
            }
            this.pulsarClient = this.createClientInstance(this.localCluster);
        }
        return this.pulsarClient;
    }

    public synchronized void setLocalCluster(ClusterData clusterData) {
        this.localCluster = clusterData;
    }

    private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
        ClientBuilder clientBuilder = PulsarClient.builder().statsInterval(0L, TimeUnit.SECONDS).enableTls(this.config.isTlsEnabled()).allowTlsInsecureConnection(this.config.isTlsAllowInsecureConnection()).tlsTrustCertsFilePath(this.config.getBrokerClientTrustCertsFilePath()).ioThreads(this.config.getWebSocketNumIoThreads()).connectionsPerBroker(this.config.getWebSocketConnectionsPerBroker());
        if (StringUtils.isNotBlank(this.config.getBrokerClientAuthenticationPlugin()) && StringUtils.isNotBlank(this.config.getBrokerClientAuthenticationParameters())) {
            clientBuilder.authentication(this.config.getBrokerClientAuthenticationPlugin(), this.config.getBrokerClientAuthenticationParameters());
        }
        if (this.config.isBrokerClientTlsEnabled()) {
            if (StringUtils.isNotBlank(clusterData.getBrokerServiceUrlTls())) {
                clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
            } else if (StringUtils.isNotBlank(clusterData.getServiceUrlTls())) {
                clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
            }
        } else if (StringUtils.isNotBlank(clusterData.getBrokerServiceUrl())) {
            clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl());
        } else {
            clientBuilder.serviceUrl(clusterData.getServiceUrl());
        }
        return clientBuilder.build();
    }

    private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
        if (StringUtils.isNotBlank(config.getBrokerServiceUrl()) || StringUtils.isNotBlank(config.getBrokerServiceUrlTls())) {
            return ClusterData.builder().serviceUrl(config.getServiceUrl()).serviceUrlTls(config.getServiceUrlTls()).brokerServiceUrl(config.getBrokerServiceUrl()).brokerServiceUrlTls(config.getBrokerServiceUrlTls()).build();
        }
        if (StringUtils.isNotBlank(config.getServiceUrl()) || StringUtils.isNotBlank(config.getServiceUrlTls())) {
            return ClusterData.builder().serviceUrl(config.getServiceUrl()).serviceUrlTls(config.getServiceUrlTls()).build();
        }
        return null;
    }

    private ClusterData retrieveClusterData() throws PulsarServerException {
        if (this.pulsarResources == null) {
            throw new PulsarServerException("Failed to retrieve Cluster data due to empty ConfigurationStoreServers");
        }
        try {
            this.localCluster = this.pulsarResources.getClusterResources().getCluster(this.config.getClusterName()).orElseThrow(() -> new MetadataStoreException.NotFoundException("Cluster " + this.config.getClusterName()));
            return this.localCluster;
        }
        catch (Exception e) {
            throw new PulsarServerException(e);
        }
    }

    public ProxyStats getProxyStats() {
        return this.proxyStats;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public boolean isAuthenticationEnabled() {
        if (this.config == null) {
            return false;
        }
        return this.config.isAuthenticationEnabled();
    }

    public boolean isAuthorizationEnabled() {
        if (this.config == null) {
            return false;
        }
        return this.config.isAuthorizationEnabled();
    }

    public boolean addProducer(ProducerHandler producer) {
        return this.topicProducerMap.computeIfAbsent(producer.getProducer().getTopic(), topic -> ConcurrentOpenHashSet.newBuilder().build()).add(producer);
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> getProducers() {
        return this.topicProducerMap;
    }

    public boolean removeProducer(ProducerHandler producer) {
        String topicName = producer.getProducer().getTopic();
        if (this.topicProducerMap.containsKey(topicName)) {
            return this.topicProducerMap.get(topicName).remove(producer);
        }
        return false;
    }

    public boolean addConsumer(ConsumerHandler consumer) {
        return this.topicConsumerMap.computeIfAbsent(consumer.getConsumer().getTopic(), topic -> ConcurrentOpenHashSet.newBuilder().build()).add(consumer);
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ConsumerHandler>> getConsumers() {
        return this.topicConsumerMap;
    }

    public boolean removeConsumer(ConsumerHandler consumer) {
        String topicName = consumer.getConsumer().getTopic();
        if (this.topicConsumerMap.containsKey(topicName)) {
            return this.topicConsumerMap.get(topicName).remove(consumer);
        }
        return false;
    }

    public boolean addReader(ReaderHandler reader) {
        return this.topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> ConcurrentOpenHashSet.newBuilder().build()).add(reader);
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> getReaders() {
        return this.topicReaderMap;
    }

    public boolean removeReader(ReaderHandler reader) {
        String topicName = reader.getConsumer().getTopic();
        if (this.topicReaderMap.containsKey(topicName)) {
            return this.topicReaderMap.get(topicName).remove(reader);
        }
        return false;
    }

    public ServiceConfiguration getConfig() {
        return this.config;
    }
}

