/*
 * Decompiled with CFR 0.152.
 */
package org.apache.brooklyn.entity.nosql.riak;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.enricher.stock.Enrichers;
import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
import org.apache.brooklyn.entity.nosql.riak.RiakCluster;
import org.apache.brooklyn.entity.nosql.riak.RiakNode;
import org.apache.brooklyn.util.JavaGroovyEquivalents;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RiakClusterImpl
extends DynamicClusterImpl
implements RiakCluster {
    private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class);
    private transient Object mutex = new Object[0];

    public void init() {
        super.init();
        log.info("Initializing the riak cluster...");
        this.setAttribute(IS_CLUSTER_INIT, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doStart() {
        super.doStart();
        this.connectSensors();
        try {
            Duration delay = (Duration)this.getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER);
            Tasks.setBlockingDetails((String)("Sleeping for " + delay + " before advertising cluster available"));
            Time.sleep((Duration)delay);
        }
        finally {
            Tasks.resetBlockingDetails();
        }
        Optional anyNode = Iterables.tryFind((Iterable)this.getMembers(), (Predicate)Predicates.and((Predicate[])new Predicate[]{Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, (Object)true), EntityPredicates.attributeEqualTo((AttributeSensor)RiakNode.SERVICE_UP, (Object)true)}));
        if (anyNode.isPresent()) {
            this.setAttribute(IS_CLUSTER_INIT, true);
        } else {
            log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", (Object)this.getId());
            ServiceStateLogic.setExpectedState((Entity)this, (Lifecycle)Lifecycle.ON_FIRE);
        }
    }

    protected EntitySpec<?> getMemberSpec() {
        EntitySpec result = (EntitySpec)this.config().get(MEMBER_SPEC);
        if (result != null) {
            return result;
        }
        return EntitySpec.create(RiakNode.class);
    }

    protected void connectSensors() {
        this.addPolicy(((PolicySpec)PolicySpec.create(MemberTrackingPolicy.class).displayName("Controller targets tracker")).configure((CharSequence)"sensorsToTrack", (Object)ImmutableSet.of((Object)RiakNode.SERVICE_UP)).configure((CharSequence)"group", (Object)this));
        EnricherSpec first = ((Enrichers.AggregatorBuilder)((Enrichers.AggregatorBuilder)Enrichers.builder().aggregating(Attributes.MAIN_URI).publishing(Attributes.MAIN_URI).computing((Function)new Function<Collection<URI>, URI>(){

            public URI apply(Collection<URI> input) {
                return input.iterator().next();
            }
        })).fromMembers()).build();
        this.addEnricher(first);
        ImmutableMap enricherSetup = ImmutableMap.builder().put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE).put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE).put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE).build();
        for (AttributeSensor nodeSensor : enricherSetup.keySet()) {
            this.addSummingMemberEnricher((AttributeSensor<? extends Number>)nodeSensor);
            this.addAveragingMemberEnricher((AttributeSensor<? extends Number>)nodeSensor, (AttributeSensor<? extends Number>)((AttributeSensor)enricherSetup.get(nodeSensor)));
        }
    }

    private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) {
        this.addEnricher(((Enrichers.AggregatorBuilder)((Enrichers.AggregatorBuilder)Enrichers.builder().aggregating(fromSensor).publishing(toSensor).fromMembers()).computingAverage()).build());
    }

    private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) {
        this.addEnricher(((Enrichers.AggregatorBuilder)((Enrichers.AggregatorBuilder)Enrichers.builder().aggregating(source).publishing(source).fromMembers()).computingSum()).build());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onServerPoolMemberChanged(Entity member) {
        Object object = this.mutex;
        synchronized (object) {
            log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{this, member, member.getLocations()});
            Map nodes = (Map)this.getAttribute(RIAK_CLUSTER_NODES);
            if (this.belongsInServerPool(member)) {
                if (nodes == null) {
                    nodes = Maps.newLinkedHashMap();
                }
                String riakName = this.getRiakName(member);
                Preconditions.checkNotNull((Object)riakName);
                Boolean firstNode = (Boolean)this.getAttribute(IS_FIRST_NODE_SET);
                if (!Boolean.TRUE.equals(firstNode)) {
                    this.setAttribute(IS_FIRST_NODE_SET, Boolean.TRUE);
                    nodes.put(member, riakName);
                    this.setAttribute(RIAK_CLUSTER_NODES, nodes);
                    ((EntityInternal)member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, (Object)Boolean.TRUE);
                    log.info("Added initial Riak node {}: {}; {} to new cluster", new Object[]{this, member, this.getRiakName(member)});
                } else {
                    Optional anyNodeInCluster = Iterables.tryFind(nodes.keySet(), (Predicate)Predicates.and((Predicate)Predicates.instanceOf(RiakNode.class), (Predicate)EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, (Object)true)));
                    if (anyNodeInCluster.isPresent()) {
                        if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) {
                            String anyNodeName = (String)((Entity)anyNodeInCluster.get()).getAttribute(RiakNode.RIAK_NODE_NAME);
                            Entities.invokeEffectorWithArgs((EntityLocal)this, (Entity)member, RiakNode.JOIN_RIAK_CLUSTER, (Object[])new Object[]{anyNodeName}).blockUntilEnded();
                            nodes.put(member, riakName);
                            this.setAttribute(RIAK_CLUSTER_NODES, nodes);
                            log.info("Added Riak node {}: {}; {} to cluster", new Object[]{this, member, this.getRiakName(member)});
                        }
                    } else {
                        log.error("isFirstNodeSet, but no cluster members found to add {}", (Object)member.getId());
                    }
                }
            } else if (nodes != null && nodes.containsKey(member)) {
                DependentConfiguration.attributeWhenReady((Entity)member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, (Predicate)Predicates.equalTo((Object)false)).blockUntilEnded(Duration.TWO_MINUTES);
                Optional anyNodeInCluster = Iterables.tryFind(nodes.keySet(), (Predicate)Predicates.and((Predicate[])new Predicate[]{Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, (Object)true), Predicates.not((Predicate)Predicates.equalTo((Object)member))}));
                if (anyNodeInCluster.isPresent()) {
                    Entities.invokeEffectorWithArgs((EntityLocal)this, (Entity)((Entity)anyNodeInCluster.get()), RiakNode.REMOVE_FROM_CLUSTER, (Object[])new Object[]{this.getRiakName(member)}).blockUntilEnded();
                }
                nodes.remove(member);
                this.setAttribute(RIAK_CLUSTER_NODES, nodes);
                log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{this, member, this.getRiakName(member)});
            }
            ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap((EntityLocal)this, (AttributeSensor)RIAK_CLUSTER_NODES);
            this.calculateClusterAddresses();
        }
    }

    private void calculateClusterAddresses() {
        ArrayList addresses = Lists.newArrayList();
        ArrayList addressesPbPort = Lists.newArrayList();
        for (Entity entity : this.getMembers()) {
            if (!(entity instanceof RiakNode) || !((Boolean)entity.getAttribute(Attributes.SERVICE_UP)).booleanValue()) continue;
            RiakNode riakNode = (RiakNode)entity;
            addresses.add((String)riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute((AttributeSensor)RiakNode.RIAK_WEB_PORT));
            addressesPbPort.add((String)riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute((AttributeSensor)RiakNode.RIAK_PB_PORT));
        }
        this.setAttribute(RiakCluster.NODE_LIST, Joiner.on((String)",").join((Iterable)addresses));
        this.setAttribute(RiakCluster.NODE_LIST_PB_PORT, Joiner.on((String)",").join((Iterable)addressesPbPort));
    }

    protected boolean belongsInServerPool(Entity member) {
        if (!JavaGroovyEquivalents.groovyTruth((Object)member.getAttribute(Startable.SERVICE_UP))) {
            log.trace("Members of {}, checking {}, eliminating because not up", (Object)this, (Object)member);
            return false;
        }
        if (!this.getMembers().contains(member)) {
            log.trace("Members of {}, checking {}, eliminating because not member", (Object)this, (Object)member);
            return false;
        }
        log.trace("Members of {}, checking {}, approving", (Object)this, (Object)member);
        return true;
    }

    private String getRiakName(Entity node) {
        return (String)node.getAttribute(RiakNode.RIAK_NODE_NAME);
    }

    public static class MemberTrackingPolicy
    extends AbstractMembershipTrackingPolicy {
        protected void onEntityEvent(AbstractMembershipTrackingPolicy.EventType type, Entity entity) {
            ((RiakClusterImpl)this.entity).onServerPoolMemberChanged(entity);
        }
    }
}

