package org.apache.storm.spout;

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/spout/CheckpointSpout.class */
public class CheckpointSpout extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CheckpointSpout.class);
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    @Override // org.apache.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        open(topologyContext, spoutOutputCollector, loadCheckpointInterval(map), loadCheckpointState(map, topologyContext));
    }

    void open(TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector, int i, KeyValueState<String, CheckPointState> keyValueState) {
        this.context = topologyContext;
        this.collector = spoutOutputCollector;
        this.checkpointInterval = i;
        this.sleepInterval = i / 10;
        this.checkpointState = keyValueState;
        this.curTxState = keyValueState.get(TX_STATE_KEY);
        this.lastCheckpointTs = 0L;
        this.recoveryStepInProgress = false;
        this.checkpointStepInProgress = false;
        this.recovering = true;
    }

    @Override // org.apache.storm.spout.ISpout
    public void nextTuple() {
        if (shouldRecover()) {
            handleRecovery();
            startProgress();
        } else if (!shouldCheckpoint()) {
            Utils.sleep(this.sleepInterval);
        } else {
            doCheckpoint();
            startProgress();
        }
    }

    @Override // org.apache.storm.topology.base.BaseRichSpout, org.apache.storm.spout.ISpout
    public void ack(Object obj) {
        LOG.debug("Got ack with txid {}, current txState {}", obj, this.curTxState);
        if (this.curTxState.getTxid() != ((Number) obj).longValue()) {
            LOG.warn("Ack msgid {}, txState.txid {} mismatch", obj, Long.valueOf(this.curTxState.getTxid()));
        } else if (this.recovering) {
            handleRecoveryAck();
        } else {
            handleCheckpointAck();
        }
        resetProgress();
    }

    @Override // org.apache.storm.topology.base.BaseRichSpout, org.apache.storm.spout.ISpout
    public void fail(Object obj) {
        LOG.debug("Got fail with msgid {}", obj);
        if (!this.recovering) {
            LOG.debug("Checkpoint failed, will trigger recovery");
            this.recovering = true;
        }
        resetProgress();
    }

    @Override // org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    public static boolean isCheckpoint(Tuple tuple) {
        return CHECKPOINT_STREAM_ID.equals(tuple.getSourceStreamId());
    }

    private KeyValueState<String, CheckPointState> loadCheckpointState(Map map, TopologyContext topologyContext) {
        KeyValueState<String, CheckPointState> keyValueState = (KeyValueState) StateFactory.getState(topologyContext.getThisComponentId() + "-" + topologyContext.getThisTaskId(), map, topologyContext);
        if (keyValueState.get(TX_STATE_KEY) == null) {
            CheckPointState checkPointState = new CheckPointState(-1L, CheckPointState.State.COMMITTED);
            keyValueState.put(TX_STATE_KEY, checkPointState);
            keyValueState.commit();
            LOG.debug("Initialized checkpoint spout state with txState {}", checkPointState);
        } else {
            LOG.debug("Got checkpoint spout state {}", keyValueState.get(TX_STATE_KEY));
        }
        return keyValueState;
    }

    private int loadCheckpointInterval(Map map) {
        int i = 0;
        if (map.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
            i = ((Number) map.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
        }
        int max = Math.max(100, i);
        LOG.info("Checkpoint interval is {} millis", Integer.valueOf(max));
        return max;
    }

    private boolean shouldRecover() {
        return this.recovering && !this.recoveryStepInProgress;
    }

    private boolean shouldCheckpoint() {
        return (this.recovering || this.checkpointStepInProgress || (this.curTxState.getState() == CheckPointState.State.COMMITTED && !checkpointIntervalElapsed())) ? false : true;
    }

    private boolean checkpointIntervalElapsed() {
        return System.currentTimeMillis() - this.lastCheckpointTs > ((long) this.checkpointInterval);
    }

    private void handleRecovery() {
        LOG.debug("In recovery");
        emit(this.curTxState.getTxid(), this.curTxState.nextAction(true));
    }

    private void handleRecoveryAck() {
        CheckPointState nextState = this.curTxState.nextState(true);
        if (this.curTxState != nextState) {
            saveTxState(nextState);
        } else {
            LOG.debug("Recovery complete, current state {}", this.curTxState);
            this.recovering = false;
        }
    }

    private void doCheckpoint() {
        LOG.debug("In checkpoint");
        if (this.curTxState.getState() == CheckPointState.State.COMMITTED) {
            saveTxState(this.curTxState.nextState(false));
            this.lastCheckpointTs = System.currentTimeMillis();
        }
        emit(this.curTxState.getTxid(), this.curTxState.nextAction(false));
    }

    private void handleCheckpointAck() {
        saveTxState(this.curTxState.nextState(false));
    }

    private void emit(long j, CheckPointState.Action action) {
        LOG.debug("Current state {}, emitting txid {}, action {}", this.curTxState, Long.valueOf(j), action);
        this.collector.emit(CHECKPOINT_STREAM_ID, new Values(Long.valueOf(j), action), Long.valueOf(j));
    }

    private void saveTxState(CheckPointState checkPointState) {
        LOG.debug("saveTxState, current state {} -> new state {}", this.curTxState, checkPointState);
        this.checkpointState.put(TX_STATE_KEY, checkPointState);
        this.checkpointState.commit();
        this.curTxState = checkPointState;
    }

    private void startProgress() {
        if (this.recovering) {
            this.recoveryStepInProgress = true;
        } else {
            this.checkpointStepInProgress = true;
        }
    }

    private void resetProgress() {
        if (this.recovering) {
            this.recoveryStepInProgress = false;
        } else {
            this.checkpointStepInProgress = false;
        }
    }
}
