package org.apache.storm.topology;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/topology/BaseStatefulBoltExecutor.class */
public abstract class BaseStatefulBoltExecutor implements IRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BaseStatefulBoltExecutor.class);
    private int checkPointInputTaskCount;
    protected OutputCollector collector;
    private long lastTxid = Long.MIN_VALUE;
    private final Map<TransactionRequest, Integer> transactionRequestCount = new HashMap();

    /* loaded from: input_file:org/apache/storm/topology/BaseStatefulBoltExecutor$AnchoringOutputCollector.class */
    protected static class AnchoringOutputCollector extends OutputCollector {
        /* JADX INFO: Access modifiers changed from: package-private */
        public AnchoringOutputCollector(IOutputCollector iOutputCollector) {
            super(iOutputCollector);
        }

        @Override // org.apache.storm.task.OutputCollector
        public List<Integer> emit(String str, List<Object> list) {
            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
        }

        @Override // org.apache.storm.task.OutputCollector
        public void emitDirect(int i, String str, List<Object> list) {
            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/topology/BaseStatefulBoltExecutor$TransactionRequest.class */
    public static class TransactionRequest {
        private final CheckPointState.Action action;
        private final long txid;

        TransactionRequest(CheckPointState.Action action, long j) {
            this.action = action;
            this.txid = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TransactionRequest transactionRequest = (TransactionRequest) obj;
            if (this.txid != transactionRequest.txid) {
                return false;
            }
            return this.action == null ? transactionRequest.action == null : this.action.equals(transactionRequest.action);
        }

        public int hashCode() {
            return (31 * (this.action != null ? this.action.hashCode() : 0)) + ((int) (this.txid ^ (this.txid >>> 32)));
        }

        public String toString() {
            return "TransactionRequest{action='" + this.action + "', txid=" + this.txid + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init(TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.checkPointInputTaskCount = getCheckpointInputTaskCount(topologyContext);
    }

    private int getCheckpointInputTaskCount(TopologyContext topologyContext) {
        int i = 0;
        for (GlobalStreamId globalStreamId : topologyContext.getThisSources().keySet()) {
            if (CheckpointSpout.CHECKPOINT_STREAM_ID.equals(globalStreamId.get_streamId())) {
                i += topologyContext.getComponentTasks(globalStreamId.get_componentId()).size();
            }
        }
        return i;
    }

    @Override // org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (CheckpointSpout.isCheckpoint(tuple)) {
            processCheckpoint(tuple);
        } else {
            handleTuple(tuple);
        }
    }

    private void processCheckpoint(Tuple tuple) {
        CheckPointState.Action action = (CheckPointState.Action) tuple.getValueByField(CheckpointSpout.CHECKPOINT_FIELD_ACTION);
        long longValue = tuple.getLongByField(CheckpointSpout.CHECKPOINT_FIELD_TXID).longValue();
        if (!shouldProcessTransaction(action, longValue)) {
            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, transactionRequestCount {}", action, Long.valueOf(longValue), Integer.valueOf(this.checkPointInputTaskCount), this.transactionRequestCount);
            this.collector.ack(tuple);
            return;
        }
        LOG.debug("Processing action {}, txid {}", action, Long.valueOf(longValue));
        try {
            if (longValue >= this.lastTxid) {
                handleCheckpoint(tuple, action, longValue);
                if (action == CheckPointState.Action.ROLLBACK) {
                    this.lastTxid = longValue - 1;
                } else {
                    this.lastTxid = longValue;
                }
            } else {
                LOG.debug("Ignoring old transaction. Action {}, txid {}", action, Long.valueOf(longValue));
                this.collector.ack(tuple);
            }
        } catch (Throwable th) {
            LOG.error("Got error while processing checkpoint tuple", th);
            this.collector.fail(tuple);
            this.collector.reportError(th);
        }
    }

    private boolean shouldProcessTransaction(CheckPointState.Action action, long j) {
        Integer num;
        TransactionRequest transactionRequest = new TransactionRequest(action, j);
        Integer num2 = this.transactionRequestCount.get(transactionRequest);
        if (num2 == null) {
            this.transactionRequestCount.put(transactionRequest, 1);
            num = 1;
        } else {
            Map<TransactionRequest, Integer> map = this.transactionRequestCount;
            Integer valueOf = Integer.valueOf(num2.intValue() + 1);
            num = valueOf;
            map.put(transactionRequest, valueOf);
        }
        if (num.intValue() != this.checkPointInputTaskCount) {
            return false;
        }
        this.transactionRequestCount.remove(transactionRequest);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void declareCheckpointStream(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(CheckpointSpout.CHECKPOINT_STREAM_ID, new Fields(CheckpointSpout.CHECKPOINT_FIELD_TXID, CheckpointSpout.CHECKPOINT_FIELD_ACTION));
    }

    protected abstract void handleTuple(Tuple tuple);

    protected abstract void handleCheckpoint(Tuple tuple, CheckPointState.Action action, long j);
}
