package co.cask.cdap.data.stream;

import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.conf.PropertyStore;
import co.cask.cdap.common.io.Codec;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient;
import co.cask.cdap.common.zookeeper.coordination.ResourceModifier;
import co.cask.cdap.common.zookeeper.coordination.ResourceRequirement;
import co.cask.cdap.common.zookeeper.store.ZKPropertyStore;
import co.cask.cdap.proto.Id;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import org.apache.twill.internal.zookeeper.ReentrantDistributedLock;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/data/stream/DistributedStreamCoordinatorClient.class */
public final class DistributedStreamCoordinatorClient extends AbstractStreamCoordinatorClient {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedStreamCoordinatorClient.class);
    private final ResourceCoordinatorClient resourceCoordinatorClient = new ResourceCoordinatorClient(getCoordinatorZKClient());
    private final ZKClient zkClient;

    @Inject
    public DistributedStreamCoordinatorClient(ZKClient zKClient) {
        this.zkClient = zKClient;
    }

    @Override // co.cask.cdap.data.stream.AbstractStreamCoordinatorClient
    protected void doStartUp() throws Exception {
        this.resourceCoordinatorClient.startAndWait();
    }

    @Override // co.cask.cdap.data.stream.AbstractStreamCoordinatorClient
    protected void doShutDown() throws Exception {
        if (this.resourceCoordinatorClient != null) {
            this.resourceCoordinatorClient.stopAndWait();
        }
    }

    @Override // co.cask.cdap.data.stream.AbstractStreamCoordinatorClient
    protected <T> PropertyStore<T> createPropertyStore(Codec<T> codec) {
        return ZKPropertyStore.create(this.zkClient, "/streams/properties", codec);
    }

    @Override // co.cask.cdap.data.stream.AbstractStreamCoordinatorClient
    protected Lock getLock(Id.Stream stream) {
        return new ReentrantDistributedLock(ZKClients.namespace(this.zkClient, "/streams/locks"), stream.toId());
    }

    @Override // co.cask.cdap.data.stream.AbstractStreamCoordinatorClient
    protected void streamCreated(final Id.Stream stream) {
        this.resourceCoordinatorClient.modifyRequirement("streams", new ResourceModifier() { // from class: co.cask.cdap.data.stream.DistributedStreamCoordinatorClient.1
            @Nullable
            public ResourceRequirement apply(@Nullable ResourceRequirement resourceRequirement) {
                DistributedStreamCoordinatorClient.LOG.debug("Modifying requirement to add stream {} as a resource", stream);
                Set partitions = resourceRequirement != null ? resourceRequirement.getPartitions() : ImmutableSet.of();
                ResourceRequirement.Partition partition = new ResourceRequirement.Partition(stream.toId(), 1);
                if (partitions.contains(partition)) {
                    return null;
                }
                ResourceRequirement.Builder builder = ResourceRequirement.builder("streams");
                builder.addPartition(partition);
                Iterator it = partitions.iterator();
                while (it.hasNext()) {
                    builder.addPartition((ResourceRequirement.Partition) it.next());
                }
                return builder.build();
            }
        });
    }

    @Override // co.cask.cdap.data.stream.AbstractStreamCoordinatorClient
    protected void streamDeleted(final Id.Stream stream) {
        this.resourceCoordinatorClient.modifyRequirement("streams", new ResourceModifier() { // from class: co.cask.cdap.data.stream.DistributedStreamCoordinatorClient.2
            @Nullable
            public ResourceRequirement apply(@Nullable ResourceRequirement resourceRequirement) {
                DistributedStreamCoordinatorClient.LOG.debug("Modifying requirement to remove stream {}", stream);
                if (resourceRequirement == null) {
                    return null;
                }
                Set<ResourceRequirement.Partition> partitions = resourceRequirement.getPartitions();
                ResourceRequirement.Builder builder = ResourceRequirement.builder("streams");
                for (ResourceRequirement.Partition partition : partitions) {
                    if (!partition.getName().equals(stream.toId())) {
                        builder.addPartition(partition);
                    }
                }
                return builder.build();
            }
        });
    }

    private ZKClient getCoordinatorZKClient() {
        return ZKClients.namespace(this.zkClient, Constants.Stream.STREAM_ZK_COORDINATION_NAMESPACE);
    }
}
