package co.cask.cdap.common.resource;

import co.cask.cdap.common.discovery.ResolvingDiscoverable;
import co.cask.cdap.common.utils.Networks;
import co.cask.cdap.common.zookeeper.coordination.BalancedAssignmentStrategy;
import co.cask.cdap.common.zookeeper.coordination.PartitionReplica;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinator;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient;
import co.cask.cdap.common.zookeeper.coordination.ResourceHandler;
import co.cask.cdap.common.zookeeper.coordination.ResourceRequirement;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.Services;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/common/resource/ResourceBalancerService.class */
public abstract class ResourceBalancerService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceBalancerService.class);
    private final String serviceName;
    private final int partitionCount;
    private final LeaderElection election;
    private final ResourceCoordinatorClient resourceClient;
    private final DiscoveryService discoveryService;
    private final SettableFuture<?> completion;
    private Cancellable cancelDiscoverable;
    private Cancellable cancelResourceHandler;

    protected ResourceBalancerService(String str, int i, ZKClient zKClient, DiscoveryService discoveryService, final DiscoveryServiceClient discoveryServiceClient) {
        this.serviceName = str;
        this.partitionCount = i;
        this.discoveryService = discoveryService;
        final ZKClient namespace = ZKClients.namespace(zKClient, "/" + str);
        this.election = new LeaderElection(namespace, str, new ElectionHandler() { // from class: co.cask.cdap.common.resource.ResourceBalancerService.1
            private ResourceCoordinator coordinator;

            public void leader() {
                this.coordinator = new ResourceCoordinator(namespace, discoveryServiceClient, new BalancedAssignmentStrategy());
                this.coordinator.startAndWait();
            }

            public void follower() {
                if (this.coordinator != null) {
                    this.coordinator.stopAndWait();
                    this.coordinator = null;
                }
            }
        });
        this.resourceClient = new ResourceCoordinatorClient(namespace);
        this.completion = SettableFuture.create();
    }

    protected abstract Service createService(Set<Integer> set);

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        LOG.info("Starting ResourceBalancer {} service...", this.serviceName);
        this.resourceClient.submitRequirement(ResourceRequirement.builder(this.serviceName).addPartitions("", this.partitionCount, 1).build()).get();
        Discoverable createDiscoverable = createDiscoverable(this.serviceName);
        this.cancelDiscoverable = this.discoveryService.register(ResolvingDiscoverable.of(createDiscoverable));
        this.election.start();
        this.resourceClient.startAndWait();
        this.cancelResourceHandler = this.resourceClient.subscribe(this.serviceName, createResourceHandler(createDiscoverable));
        LOG.info("Started ResourceBalancer {} service...", this.serviceName);
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        LOG.info("Stopping ResourceBalancer {} service...", this.serviceName);
        Throwable th = null;
        try {
            Services.chainStop(this.election, new Service[]{this.resourceClient}).get();
        } catch (Throwable th2) {
            th = th2;
            LOG.error("Exception while shutting down {}.", this.serviceName, th2);
        }
        try {
            this.cancelResourceHandler.cancel();
            this.completion.get();
        } catch (Throwable th3) {
            th = th3;
            LOG.error("Exception while shutting down {}.", this.serviceName, th3);
        }
        try {
            this.cancelDiscoverable.cancel();
        } catch (Throwable th4) {
            th = th4;
            LOG.error("Exception while shutting down{}.", this.serviceName, th4);
        }
        if (th != null) {
            throw Throwables.propagate(th);
        }
        LOG.info("Stopped ResourceBalancer {} service.", this.serviceName);
    }

    private ResourceHandler createResourceHandler(Discoverable discoverable) {
        return new ResourceHandler(discoverable) { // from class: co.cask.cdap.common.resource.ResourceBalancerService.2
            private Service service;

            @Override // co.cask.cdap.common.zookeeper.coordination.ResourceHandler
            public void onChange(Collection<PartitionReplica> collection) {
                HashSet newHashSet = Sets.newHashSet();
                Iterator<PartitionReplica> it = collection.iterator();
                while (it.hasNext()) {
                    newHashSet.add(Integer.valueOf(it.next().getName()));
                }
                ResourceBalancerService.LOG.info("Partitions changed {}, service: {}", newHashSet, ResourceBalancerService.this.serviceName);
                try {
                    if (this.service != null) {
                        this.service.stopAndWait();
                    }
                    if (newHashSet.isEmpty() || !ResourceBalancerService.this.election.isRunning()) {
                        this.service = null;
                    } else {
                        this.service = ResourceBalancerService.this.createService(newHashSet);
                        this.service.startAndWait();
                    }
                } catch (Throwable th) {
                    ResourceBalancerService.LOG.error("Failed to change partitions, service: {}.", ResourceBalancerService.this.serviceName, th);
                    ResourceBalancerService.this.completion.setException(th);
                    ResourceBalancerService.this.stop();
                }
            }

            @Override // co.cask.cdap.common.zookeeper.coordination.AssignmentChangeListener
            public void finished(Throwable th) {
                try {
                    if (this.service != null) {
                        this.service.stopAndWait();
                        this.service = null;
                    }
                    ResourceBalancerService.this.completion.set(null);
                } catch (Throwable th2) {
                    ResourceBalancerService.LOG.error("Exception when stopping service {}", this.service, th2);
                    Throwable th3 = th == null ? th2 : th;
                    if (th3 != th2) {
                        th3.addSuppressed(th2);
                    }
                    ResourceBalancerService.this.completion.setException(th2);
                }
            }
        };
    }

    private Discoverable createDiscoverable(String str) {
        InetSocketAddress inetSocketAddress;
        int randomPort = Networks.getRandomPort();
        try {
            inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), randomPort);
        } catch (UnknownHostException e) {
            inetSocketAddress = new InetSocketAddress(randomPort);
        }
        return new Discoverable(str, inetSocketAddress);
    }
}
