package co.cask.cdap.metrics.runtime;

import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.discovery.ResolvingDiscoverable;
import co.cask.cdap.common.utils.Networks;
import co.cask.cdap.common.zookeeper.coordination.BalancedAssignmentStrategy;
import co.cask.cdap.common.zookeeper.coordination.PartitionReplica;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinator;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient;
import co.cask.cdap.common.zookeeper.coordination.ResourceHandler;
import co.cask.cdap.common.zookeeper.coordination.ResourceRequirement;
import co.cask.cdap.metrics.process.KafkaMetricsProcessorServiceFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.Services;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/runtime/KafkaMetricsProcessorService.class */
public final class KafkaMetricsProcessorService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsProcessorService.class);
    private static final String SERVICE_NAME = "metrics.processor.consumer";
    private final CConfiguration conf;
    private final LeaderElection election;
    private final ResourceCoordinatorClient resourceClient;
    private final DiscoveryService discoveryService;
    private final KafkaMetricsProcessorServiceFactory metricsProcessorFactory;
    private final SettableFuture<?> completion = SettableFuture.create();
    private Cancellable cancelDiscoverable;
    private Cancellable cancelResourceHandler;

    @Nullable
    private MetricsContext metricsContext;

    @Inject
    public KafkaMetricsProcessorService(CConfiguration cConfiguration, final ZKClientService zKClientService, DiscoveryService discoveryService, final DiscoveryServiceClient discoveryServiceClient, KafkaMetricsProcessorServiceFactory kafkaMetricsProcessorServiceFactory) {
        this.conf = cConfiguration;
        this.metricsProcessorFactory = kafkaMetricsProcessorServiceFactory;
        this.discoveryService = discoveryService;
        this.election = new LeaderElection(zKClientService, SERVICE_NAME, new ElectionHandler() { // from class: co.cask.cdap.metrics.runtime.KafkaMetricsProcessorService.1
            private ResourceCoordinator coordinator;

            public void leader() {
                this.coordinator = new ResourceCoordinator(zKClientService, discoveryServiceClient, new BalancedAssignmentStrategy());
                this.coordinator.startAndWait();
            }

            public void follower() {
                if (this.coordinator != null) {
                    this.coordinator.stopAndWait();
                    this.coordinator = null;
                }
            }
        });
        this.resourceClient = new ResourceCoordinatorClient(zKClientService);
    }

    public void setMetricsContext(MetricsContext metricsContext) {
        this.metricsContext = metricsContext;
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.metrics.runtime.KafkaMetricsProcessorService.2
            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                Thread thread = new Thread(runnable, KafkaMetricsProcessorService.this.getServiceName());
                thread.setDaemon(true);
                thread.start();
            }
        };
    }

    protected void startUp() throws Exception {
        LOG.info("Starting Metrics Processor ...");
        Discoverable createDiscoverable = createDiscoverable(SERVICE_NAME);
        this.cancelDiscoverable = this.discoveryService.register(ResolvingDiscoverable.of(createDiscoverable));
        this.election.start();
        this.resourceClient.startAndWait();
        this.resourceClient.submitRequirement(ResourceRequirement.builder(SERVICE_NAME).addPartitions("", this.conf.getInt("metrics.kafka.partition.size", 1), 1).build()).get();
        this.cancelResourceHandler = this.resourceClient.subscribe(SERVICE_NAME, createResourceHandler(this.metricsProcessorFactory, createDiscoverable));
    }

    protected void run() throws Exception {
        this.completion.get();
    }

    protected void triggerShutdown() {
        this.completion.set((Object) null);
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping Metrics Processor ...");
        Throwable th = null;
        try {
            Services.chainStop(this.election, new Service[]{this.resourceClient}).get();
        } catch (Throwable th2) {
            th = th2;
            LOG.error("Exception while shutting down.", th2);
        }
        try {
            this.cancelResourceHandler.cancel();
        } catch (Throwable th3) {
            th = th3;
            LOG.error("Exception while shutting down.", th3);
        }
        try {
            this.cancelDiscoverable.cancel();
        } catch (Throwable th4) {
            th = th4;
            LOG.error("Exception while shutting down.", th4);
        }
        if (th != null) {
            throw Throwables.propagate(th);
        }
    }

    private ResourceHandler createResourceHandler(final KafkaMetricsProcessorServiceFactory kafkaMetricsProcessorServiceFactory, Discoverable discoverable) {
        return new ResourceHandler(discoverable) { // from class: co.cask.cdap.metrics.runtime.KafkaMetricsProcessorService.3
            private co.cask.cdap.metrics.process.KafkaMetricsProcessorService service;

            public void onChange(Collection<PartitionReplica> collection) {
                HashSet newHashSet = Sets.newHashSet();
                Iterator<PartitionReplica> it = collection.iterator();
                while (it.hasNext()) {
                    newHashSet.add(Integer.valueOf(it.next().getName()));
                }
                KafkaMetricsProcessorService.LOG.info("Metrics Kafka partition changed {}", newHashSet);
                try {
                    if (this.service != null) {
                        this.service.stopAndWait();
                    }
                    if (newHashSet.isEmpty() || !KafkaMetricsProcessorService.this.election.isRunning()) {
                        this.service = null;
                    } else {
                        this.service = kafkaMetricsProcessorServiceFactory.create(newHashSet);
                        this.service.setMetricsContext(KafkaMetricsProcessorService.this.metricsContext);
                        this.service.startAndWait();
                    }
                } catch (Throwable th) {
                    KafkaMetricsProcessorService.LOG.error("Failed to change Kafka partition.", th);
                    KafkaMetricsProcessorService.this.completion.setException(th);
                }
            }

            public void finished(Throwable th) {
                if (this.service != null) {
                    this.service.stopAndWait();
                    this.service = null;
                }
            }
        };
    }

    private Discoverable createDiscoverable(final String str) {
        InetSocketAddress inetSocketAddress;
        int randomPort = Networks.getRandomPort();
        try {
            inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), randomPort);
        } catch (UnknownHostException e) {
            inetSocketAddress = new InetSocketAddress(randomPort);
        }
        final InetSocketAddress inetSocketAddress2 = inetSocketAddress;
        return new Discoverable() { // from class: co.cask.cdap.metrics.runtime.KafkaMetricsProcessorService.4
            public String getName() {
                return str;
            }

            public InetSocketAddress getSocketAddress() {
                return inetSocketAddress2;
            }
        };
    }
}
