package org.apache.storm.starter.trident;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.state.ReadOnlyState;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.map.ReadOnlyMapState;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.jboss.netty.channel.ChannelPipelineCoverage;

/* loaded from: input_file:org/apache/storm/starter/trident/TridentReach.class */
public class TridentReach {
    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() { // from class: org.apache.storm.starter.trident.TridentReach.1
        {
            put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
            put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
            put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
        }
    };
    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() { // from class: org.apache.storm.starter.trident.TridentReach.2
        {
            put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
            put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
            put("tim", Arrays.asList("alex"));
            put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
            put("adam", Arrays.asList("david", "carissa"));
            put("mike", Arrays.asList("john", "bob"));
            put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
        }
    };

    /* loaded from: input_file:org/apache/storm/starter/trident/TridentReach$ExpandList.class */
    public static class ExpandList extends BaseFunction {
        public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
            List list = (List) tridentTuple.getValue(0);
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    tridentCollector.emit(new Values(new Object[]{it.next()}));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/trident/TridentReach$One.class */
    public static class One implements CombinerAggregator<Integer> {
        /* renamed from: init, reason: merged with bridge method [inline-methods] */
        public Integer m9692init(TridentTuple tridentTuple) {
            return 1;
        }

        public Integer combine(Integer num, Integer num2) {
            return 1;
        }

        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public Integer m9691zero() {
            return 1;
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/trident/TridentReach$StaticSingleKeyMapState.class */
    public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
        Map _map;

        /* loaded from: input_file:org/apache/storm/starter/trident/TridentReach$StaticSingleKeyMapState$Factory.class */
        public static class Factory implements StateFactory {
            Map _map;

            public Factory(Map map) {
                this._map = map;
            }

            public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
                return new StaticSingleKeyMapState(this._map);
            }
        }

        public StaticSingleKeyMapState(Map map) {
            this._map = map;
        }

        public List<Object> multiGet(List<List<Object>> list) {
            ArrayList arrayList = new ArrayList();
            Iterator<List<Object>> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(this._map.get(it.next().get(0)));
            }
            return arrayList;
        }
    }

    public static StormTopology buildTopology(LocalDRPC localDRPC) {
        TridentTopology tridentTopology = new TridentTopology();
        TridentState newStaticState = tridentTopology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
        tridentTopology.newDRPCStream("reach", localDRPC).stateQuery(newStaticState, new Fields(new String[]{"args"}), new MapGet(), new Fields(new String[]{"tweeters"})).each(new Fields(new String[]{"tweeters"}), new ExpandList(), new Fields(new String[]{"tweeter"})).shuffle().stateQuery(tridentTopology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)), new Fields(new String[]{"tweeter"}), new MapGet(), new Fields(new String[]{"followers"})).each(new Fields(new String[]{"followers"}), new ExpandList(), new Fields(new String[]{"follower"})).groupBy(new Fields(new String[]{"follower"})).aggregate(new One(), new Fields(new String[]{ChannelPipelineCoverage.ONE})).aggregate(new Fields(new String[]{ChannelPipelineCoverage.ONE}), new Sum(), new Fields(new String[]{"reach"}));
        return tridentTopology.build();
    }

    public static void main(String[] strArr) throws Exception {
        LocalDRPC localDRPC = new LocalDRPC();
        Config config = new Config();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("reach", config, buildTopology(localDRPC));
        Thread.sleep(2000L);
        System.out.println("REACH: " + localDRPC.execute("reach", "aaa"));
        System.out.println("REACH: " + localDRPC.execute("reach", "foo.com/blog/1"));
        System.out.println("REACH: " + localDRPC.execute("reach", "engineering.twitter.com/blog/5"));
        localCluster.shutdown();
        localDRPC.shutdown();
    }
}
