package org.apache.storm.topology;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.state.State;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.CheckpointTupleForwarder;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/topology/StatefulBoltExecutor.class */
public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwarder {
    private static final Logger LOG = LoggerFactory.getLogger(StatefulBoltExecutor.class);
    private final IStatefulBolt<T> bolt;
    private State state;
    private boolean boltInitialized;
    private List<Tuple> pendingTuples;
    private List<Tuple> preparedTuples;
    private AckTrackingOutputCollector collector;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/topology/StatefulBoltExecutor$AckTrackingOutputCollector.class */
    public static class AckTrackingOutputCollector extends CheckpointTupleForwarder.AnchoringOutputCollector {
        private final OutputCollector delegate;
        private final Queue<Tuple> ackedTuples;

        AckTrackingOutputCollector(OutputCollector outputCollector) {
            super(outputCollector);
            this.delegate = outputCollector;
            this.ackedTuples = new ConcurrentLinkedQueue();
        }

        List<Tuple> ackedTuples() {
            ArrayList arrayList = new ArrayList();
            Iterator<Tuple> it = this.ackedTuples.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
                it.remove();
            }
            return arrayList;
        }

        @Override // org.apache.storm.task.OutputCollector, org.apache.storm.task.IOutputCollector
        public void ack(Tuple tuple) {
            this.ackedTuples.add(tuple);
        }
    }

    public StatefulBoltExecutor(IStatefulBolt<T> iStatefulBolt) {
        super(iStatefulBolt);
        this.boltInitialized = false;
        this.pendingTuples = new ArrayList();
        this.preparedTuples = new ArrayList();
        this.bolt = iStatefulBolt;
    }

    @Override // org.apache.storm.topology.CheckpointTupleForwarder, org.apache.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        prepare(map, topologyContext, outputCollector, StateFactory.getState(topologyContext.getThisComponentId() + "-" + topologyContext.getThisTaskId(), map, topologyContext));
    }

    void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector, State state) {
        init(topologyContext, outputCollector);
        this.collector = new AckTrackingOutputCollector(outputCollector);
        this.bolt.prepare(map, topologyContext, this.collector);
        this.state = state;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.storm.topology.CheckpointTupleForwarder
    protected void handleCheckpoint(Tuple tuple, CheckPointState.Action action, long j) {
        LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", new Object[]{tuple, action, Long.valueOf(j)});
        if (action == CheckPointState.Action.PREPARE) {
            if (!this.boltInitialized) {
                LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized.");
                this.collector.fail(tuple);
                return;
            } else {
                this.bolt.prePrepare(j);
                this.state.prepareCommit(j);
                this.preparedTuples.addAll(this.collector.ackedTuples());
            }
        } else if (action == CheckPointState.Action.COMMIT) {
            this.bolt.preCommit(j);
            this.state.commit(j);
            ack(this.preparedTuples);
        } else if (action == CheckPointState.Action.ROLLBACK) {
            this.bolt.preRollback();
            this.state.rollback();
            fail(this.preparedTuples);
            fail(this.collector.ackedTuples());
        } else if (action == CheckPointState.Action.INITSTATE) {
            if (this.boltInitialized) {
                LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}", new Object[]{tuple, action, Long.valueOf(j)});
            } else {
                this.bolt.initState(this.state);
                this.boltInitialized = true;
                LOG.debug("{} pending tuples to process", Integer.valueOf(this.pendingTuples.size()));
                Iterator<Tuple> it = this.pendingTuples.iterator();
                while (it.hasNext()) {
                    doExecute(it.next());
                }
                this.pendingTuples.clear();
            }
        }
        this.collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, tuple, new Values(Long.valueOf(j), action));
        this.collector.delegate.ack(tuple);
    }

    @Override // org.apache.storm.topology.CheckpointTupleForwarder
    protected void handleTuple(Tuple tuple) {
        if (this.boltInitialized) {
            doExecute(tuple);
        } else {
            LOG.debug("Bolt state not initialized, adding tuple {} to pending tuples", tuple);
            this.pendingTuples.add(tuple);
        }
    }

    private void doExecute(Tuple tuple) {
        this.bolt.execute(tuple);
    }

    private void ack(List<Tuple> list) {
        if (list.isEmpty()) {
            return;
        }
        LOG.debug("Acking {} tuples", Integer.valueOf(list.size()));
        Iterator<Tuple> it = list.iterator();
        while (it.hasNext()) {
            this.collector.delegate.ack(it.next());
        }
        list.clear();
    }

    private void fail(List<Tuple> list) {
        if (list.isEmpty()) {
            return;
        }
        LOG.debug("Failing {} tuples", Integer.valueOf(list.size()));
        Iterator<Tuple> it = list.iterator();
        while (it.hasNext()) {
            this.collector.fail(it.next());
        }
        list.clear();
    }
}
