package org.apache.storm.starter;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.jsp.tagext.TagAttributeInfo;
import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.fs.shell.Display;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.MemoryTransactionalSpout;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.TransactionalTopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/* loaded from: input_file:org/apache/storm/starter/TransactionalWords.class */
public class TransactionalWords {
    public static final int BUCKET_SIZE = 10;
    public static final int PARTITION_TAKE_PER_BATCH = 3;
    public static Map<String, CountValue> COUNT_DATABASE = new HashMap();
    public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap();
    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() { // from class: org.apache.storm.starter.TransactionalWords.1
        {
            put(0, new ArrayList<List<Object>>() { // from class: org.apache.storm.starter.TransactionalWords.1.1
                {
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{"dog"}));
                    add(new Values(new Object[]{"chicken"}));
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{"dog"}));
                    add(new Values(new Object[]{"apple"}));
                }
            });
            put(1, new ArrayList<List<Object>>() { // from class: org.apache.storm.starter.TransactionalWords.1.2
                {
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{"dog"}));
                    add(new Values(new Object[]{"apple"}));
                    add(new Values(new Object[]{"banana"}));
                }
            });
            put(2, new ArrayList<List<Object>>() { // from class: org.apache.storm.starter.TransactionalWords.1.3
                {
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{Display.Cat.NAME}));
                    add(new Values(new Object[]{"dog"}));
                    add(new Values(new Object[]{"dog"}));
                    add(new Values(new Object[]{"dog"}));
                    add(new Values(new Object[]{"dog"}));
                }
            });
        }
    };

    /* loaded from: input_file:org/apache/storm/starter/TransactionalWords$BucketCountUpdater.class */
    public static class BucketCountUpdater extends BaseTransactionalBolt {
        BatchOutputCollector _collector;
        TransactionAttempt _attempt;
        Map<Integer, Integer> _accum = new HashMap();
        int _count = 0;

        public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
            this._collector = batchOutputCollector;
            this._attempt = transactionAttempt;
        }

        public void execute(Tuple tuple) {
            Integer integer = tuple.getInteger(1);
            Integer integer2 = tuple.getInteger(2);
            Integer num = this._accum.get(integer);
            if (num == null) {
                num = 0;
            }
            this._accum.put(integer, Integer.valueOf(num.intValue() + integer2.intValue()));
        }

        public void finishBatch() {
            BucketValue bucketValue;
            for (Integer num : this._accum.keySet()) {
                BucketValue bucketValue2 = TransactionalWords.BUCKET_DATABASE.get(num);
                if (bucketValue2 == null || !bucketValue2.txid.equals(this._attempt.getTransactionId())) {
                    bucketValue = new BucketValue();
                    bucketValue.txid = this._attempt.getTransactionId();
                    bucketValue.count = this._accum.get(num).intValue();
                    if (bucketValue2 != null) {
                        bucketValue.count += bucketValue2.count;
                    }
                    TransactionalWords.BUCKET_DATABASE.put(num, bucketValue);
                } else {
                    bucketValue = bucketValue2;
                }
                this._collector.emit(new Values(new Object[]{this._attempt, num, Integer.valueOf(bucketValue.count)}));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{TagAttributeInfo.ID, "bucket", Count.NAME}));
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/TransactionalWords$BucketValue.class */
    public static class BucketValue {
        int count = 0;
        BigInteger txid;
    }

    /* loaded from: input_file:org/apache/storm/starter/TransactionalWords$Bucketize.class */
    public static class Bucketize extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
            int intValue = tuple.getInteger(2).intValue();
            Integer integer = tuple.getInteger(3);
            int i = intValue / 10;
            Integer num = null;
            if (integer != null) {
                num = Integer.valueOf(integer.intValue() / 10);
            }
            if (num == null) {
                basicOutputCollector.emit(new Values(new Object[]{transactionAttempt, Integer.valueOf(i), 1}));
            } else if (i != num.intValue()) {
                basicOutputCollector.emit(new Values(new Object[]{transactionAttempt, Integer.valueOf(i), 1}));
                basicOutputCollector.emit(new Values(new Object[]{transactionAttempt, num, -1}));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"attempt", "bucket", "delta"}));
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/TransactionalWords$CountValue.class */
    public static class CountValue {
        Integer prev_count = null;
        int count = 0;
        BigInteger txid = null;
    }

    /* loaded from: input_file:org/apache/storm/starter/TransactionalWords$KeyedCountUpdater.class */
    public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
        BatchOutputCollector _collector;
        TransactionAttempt _id;
        Map<String, Integer> _counts = new HashMap();
        int _count = 0;

        public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
            this._collector = batchOutputCollector;
            this._id = transactionAttempt;
        }

        public void execute(Tuple tuple) {
            String string = tuple.getString(1);
            Integer num = this._counts.get(string);
            if (num == null) {
                num = 0;
            }
            this._counts.put(string, Integer.valueOf(num.intValue() + 1));
        }

        public void finishBatch() {
            CountValue countValue;
            for (String str : this._counts.keySet()) {
                CountValue countValue2 = TransactionalWords.COUNT_DATABASE.get(str);
                if (countValue2 == null || !countValue2.txid.equals(this._id.getTransactionId())) {
                    countValue = new CountValue();
                    countValue.txid = this._id.getTransactionId();
                    if (countValue2 != null) {
                        countValue.prev_count = Integer.valueOf(countValue2.count);
                        countValue.count = countValue2.count;
                    }
                    countValue.count += this._counts.get(str).intValue();
                    TransactionalWords.COUNT_DATABASE.put(str, countValue);
                } else {
                    countValue = countValue2;
                }
                this._collector.emit(new Values(new Object[]{this._id, str, Integer.valueOf(countValue.count), countValue.prev_count}));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{TagAttributeInfo.ID, "key", Count.NAME, "prev-count"}));
        }
    }

    public static void main(String[] strArr) throws Exception {
        TransactionalTopologyBuilder transactionalTopologyBuilder = new TransactionalTopologyBuilder("top-n-words", "spout", new MemoryTransactionalSpout(DATA, new Fields(new String[]{"word"}), 3), 2);
        transactionalTopologyBuilder.setBolt(Count.NAME, new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields(new String[]{"word"}));
        transactionalTopologyBuilder.setBolt("bucketize", new Bucketize()).noneGrouping(Count.NAME);
        transactionalTopologyBuilder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields(new String[]{"bucket"}));
        LocalCluster localCluster = new LocalCluster();
        Config config = new Config();
        config.setDebug(true);
        config.setMaxSpoutPending(3);
        localCluster.submitTopology("top-n-topology", config, transactionalTopologyBuilder.buildTopology());
        Thread.sleep(3000L);
        localCluster.shutdown();
    }
}
