package co.cask.cdap.common.zookeeper.coordination;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.zookeeper.ZKExtOperations;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.class */
public final class ResourceCoordinatorClient extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceCoordinatorClient.class);
    private static final Function<NodeData, ResourceRequirement> NODE_DATA_TO_REQUIREMENT = new Function<NodeData, ResourceRequirement>() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.1
        @Override // com.google.common.base.Function
        public ResourceRequirement apply(@Nullable NodeData nodeData) {
            if (nodeData == null) {
                return null;
            }
            try {
                return CoordinationConstants.RESOURCE_REQUIREMENT_CODEC.decode(nodeData.getData());
            } catch (Throwable th) {
                ResourceCoordinatorClient.LOG.error("Failed to decode resource requirement: {}", Bytes.toStringBinary(nodeData.getData()), th);
                throw Throwables.propagate(th);
            }
        }
    };
    private final ZKClient zkClient;
    private final Multimap<String, AssignmentChangeListener> changeListeners = LinkedHashMultimap.create();
    private final Set<String> serviceWatched = Sets.newHashSet();
    private final Map<String, ResourceAssignment> assignments = Maps.newHashMap();
    private ExecutorService handlerExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/common/zookeeper/coordination/ResourceCoordinatorClient$AssignmentChangeListenerCaller.class */
    public final class AssignmentChangeListenerCaller implements AssignmentChangeListener {
        private final String service;
        private final AssignmentChangeListener delegate;

        private AssignmentChangeListenerCaller(String str, AssignmentChangeListener assignmentChangeListener) {
            this.service = str;
            this.delegate = assignmentChangeListener;
        }

        @Override // co.cask.cdap.common.zookeeper.coordination.AssignmentChangeListener
        public void onChange(final ResourceAssignment resourceAssignment) {
            ResourceCoordinatorClient.this.handlerExecutor.execute(new Runnable() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.AssignmentChangeListenerCaller.1
                @Override // java.lang.Runnable
                public void run() {
                    AssignmentChangeListenerCaller.this.delegate.onChange(resourceAssignment);
                }
            });
        }

        @Override // co.cask.cdap.common.zookeeper.coordination.AssignmentChangeListener
        public void finished(final Throwable th) {
            synchronized (ResourceCoordinatorClient.this) {
                if (ResourceCoordinatorClient.this.changeListeners.remove(this.service, this)) {
                    ResourceCoordinatorClient.this.handlerExecutor.execute(new Runnable() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.AssignmentChangeListenerCaller.2
                        @Override // java.lang.Runnable
                        public void run() {
                            AssignmentChangeListenerCaller.this.delegate.finished(th);
                        }
                    });
                }
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/common/zookeeper/coordination/ResourceCoordinatorClient$AssignmentListenerCancellable.class */
    private static final class AssignmentListenerCancellable implements Cancellable {
        private final AssignmentChangeListenerCaller caller;

        private AssignmentListenerCancellable(AssignmentChangeListenerCaller assignmentChangeListenerCaller) {
            this.caller = assignmentChangeListenerCaller;
        }

        public void cancel() {
            this.caller.finished(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/common/zookeeper/coordination/ResourceCoordinatorClient$AssignmentWatcher.class */
    public final class AssignmentWatcher implements Watcher {
        private final String serviceName;
        private final EnumSet<Watcher.Event.EventType> actOnTypes;

        AssignmentWatcher(String str, EnumSet<Watcher.Event.EventType> enumSet) {
            this.serviceName = str;
            this.actOnTypes = enumSet;
        }

        public void process(WatchedEvent watchedEvent) {
            if (this.actOnTypes.contains(watchedEvent.getType())) {
                synchronized (ResourceCoordinatorClient.this) {
                    if (ResourceCoordinatorClient.this.changeListeners.containsKey(this.serviceName)) {
                        ResourceCoordinatorClient.this.watchAssignment(this.serviceName);
                    } else {
                        ResourceCoordinatorClient.this.serviceWatched.remove(this.serviceName);
                    }
                }
            }
        }
    }

    public ResourceCoordinatorClient(ZKClient zKClient) {
        this.zkClient = zKClient;
    }

    public ListenableFuture<ResourceRequirement> submitRequirement(ResourceRequirement resourceRequirement) {
        try {
            return ZKExtOperations.createOrSet(this.zkClient, "/requirements/" + resourceRequirement.getName(), CoordinationConstants.RESOURCE_REQUIREMENT_CODEC.encode(resourceRequirement), resourceRequirement, 10);
        } catch (Exception e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    public ListenableFuture<ResourceRequirement> modifyRequirement(String str, ResourceModifier resourceModifier) {
        return ZKExtOperations.updateOrCreate(this.zkClient, "/requirements/" + str, resourceModifier, CoordinationConstants.RESOURCE_REQUIREMENT_CODEC);
    }

    public ListenableFuture<ResourceRequirement> fetchRequirement(String str) {
        return Futures.transform((ListenableFuture) ZKOperations.ignoreError(this.zkClient.getData("/requirements/" + str), KeeperException.NoNodeException.class, (Object) null), (Function) NODE_DATA_TO_REQUIREMENT);
    }

    public ListenableFuture<String> deleteRequirement(String str) {
        return Futures.transform((ListenableFuture) ZKOperations.ignoreError(this.zkClient.delete("/requirements/" + str), KeeperException.NoNodeException.class, str), Functions.constant(str));
    }

    public synchronized Cancellable subscribe(String str, AssignmentChangeListener assignmentChangeListener) {
        AssignmentChangeListenerCaller assignmentChangeListenerCaller = new AssignmentChangeListenerCaller(str, assignmentChangeListener);
        if (this.serviceWatched.add(str)) {
            this.changeListeners.put(str, assignmentChangeListenerCaller);
            watchAssignment(str);
        } else {
            ResourceAssignment resourceAssignment = this.assignments.get(str);
            if (resourceAssignment != null && !resourceAssignment.getAssignments().isEmpty()) {
                assignmentChangeListenerCaller.onChange(resourceAssignment);
            }
            this.changeListeners.put(str, assignmentChangeListenerCaller);
        }
        return new AssignmentListenerCancellable(assignmentChangeListenerCaller);
    }

    @Override // com.google.common.util.concurrent.AbstractService
    protected void doStart() {
        this.handlerExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("resource-coordinator-client"));
        notifyStarted();
    }

    @Override // com.google.common.util.concurrent.AbstractService
    protected void doStop() {
        try {
            finishHandlers(null);
            notifyStopped();
            this.handlerExecutor.shutdown();
        } catch (Throwable th) {
            this.handlerExecutor.shutdown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doNotifyFailed(Throwable th) {
        try {
            finishHandlers(th);
            this.handlerExecutor.shutdown();
            notifyFailed(th);
        } catch (Throwable th2) {
            this.handlerExecutor.shutdown();
            notifyFailed(th);
            throw th2;
        }
    }

    private synchronized void finishHandlers(Throwable th) {
        Iterator<AssignmentChangeListener> it = this.changeListeners.values().iterator();
        while (it.hasNext()) {
            it.next().finished(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void watchAssignment(final String str) {
        final String str2 = "/assignments/" + str;
        Futures.addCallback(this.zkClient.getData(str2, wrapWatcher(new AssignmentWatcher(str, EnumSet.of(Watcher.Event.EventType.NodeDataChanged, Watcher.Event.EventType.NodeDeleted)))), wrapCallback(new FutureCallback<NodeData>() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.2
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeData nodeData) {
                try {
                    ResourceAssignment decode = CoordinationConstants.RESOURCE_ASSIGNMENT_CODEC.decode(nodeData.getData());
                    ResourceCoordinatorClient.LOG.debug("Received resource assignment for {}. {}", str, decode.getAssignments());
                    ResourceCoordinatorClient.this.handleAssignmentChange(str, decode);
                } catch (Exception e) {
                    ResourceCoordinatorClient.LOG.error("Failed to decode ResourceAssignment {}", Bytes.toStringBinary(nodeData.getData()), e);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (!(th instanceof KeeperException.NoNodeException)) {
                    ResourceCoordinatorClient.LOG.error("Failed to getData on ZK {}{}", ResourceCoordinatorClient.this.zkClient.getConnectString(), str2, th);
                    ResourceCoordinatorClient.this.doNotifyFailed(th);
                    return;
                }
                ResourceCoordinatorClient.this.handleAssignmentChange(str, new ResourceAssignment(str));
                synchronized (ResourceCoordinatorClient.this) {
                    if (ResourceCoordinatorClient.this.changeListeners.containsKey(str)) {
                        ResourceCoordinatorClient.this.watchAssignmentOnExists(str);
                    }
                }
            }
        }), Threads.SAME_THREAD_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void watchAssignmentOnExists(final String str) {
        final String str2 = "/assignments/" + str;
        Futures.addCallback(this.zkClient.exists(str2, wrapWatcher(new AssignmentWatcher(str, EnumSet.of(Watcher.Event.EventType.NodeCreated)))), wrapCallback(new FutureCallback<Stat>() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.3
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Stat stat) {
                if (stat != null) {
                    ResourceCoordinatorClient.this.watchAssignment(str);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ResourceCoordinatorClient.LOG.error("Failed to call exists on ZK {}{}", ResourceCoordinatorClient.this.zkClient.getConnectString(), str2, th);
                ResourceCoordinatorClient.this.doNotifyFailed(th);
            }
        }), Threads.SAME_THREAD_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handleAssignmentChange(String str, ResourceAssignment resourceAssignment) {
        ResourceAssignment resourceAssignment2 = this.assignments.get(str);
        if (Objects.equal(resourceAssignment2, resourceAssignment)) {
            return;
        }
        if (resourceAssignment.getAssignments().isEmpty()) {
            this.assignments.remove(str);
        } else {
            this.assignments.put(str, resourceAssignment);
        }
        if (resourceAssignment2 == null && resourceAssignment.getAssignments().isEmpty()) {
            return;
        }
        Iterator<AssignmentChangeListener> it = this.changeListeners.get(str).iterator();
        while (it.hasNext()) {
            it.next().onChange(resourceAssignment);
        }
    }

    private Watcher wrapWatcher(final Watcher watcher) {
        return new Watcher() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.4
            public void process(WatchedEvent watchedEvent) {
                if (ResourceCoordinatorClient.this.isRunning()) {
                    watcher.process(watchedEvent);
                }
            }
        };
    }

    private <V> FutureCallback<V> wrapCallback(final FutureCallback<V> futureCallback) {
        return new FutureCallback<V>() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient.5
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(V v) {
                if (ResourceCoordinatorClient.this.isRunning()) {
                    futureCallback.onSuccess(v);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (ResourceCoordinatorClient.this.isRunning()) {
                    futureCallback.onFailure(th);
                }
            }
        };
    }
}
