package org.apache.storm.daemon;

import java.util.Map;
import org.apache.storm.Constants;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/Acker.class */
public class Acker implements IBolt {
    public static final String ACKER_COMPONENT_ID = "__acker";
    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
    public static final int TIMEOUT_BUCKET_NUM = 3;
    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
    private static final long serialVersionUID = 4430906880683183091L;
    private OutputCollector collector;
    private RotatingMap<Object, AckObject> pending;

    /* loaded from: input_file:org/apache/storm/daemon/Acker$AckObject.class */
    private static class AckObject {
        public long val;
        public long startTime;
        public int spoutTask;
        public boolean failed;

        private AckObject() {
            this.val = 0L;
            this.startTime = Time.currentTimeMillis();
            this.spoutTask = -1;
            this.failed = false;
        }

        public void updateAck(Long l) {
            this.val = Utils.bitXor(Long.valueOf(this.val), l);
        }
    }

    @Override // org.apache.storm.task.IBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.pending = new RotatingMap<>(3);
    }

    @Override // org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            LOG.debug("Number of timeout tuples:{}", Integer.valueOf(this.pending.rotate().size()));
            return;
        }
        boolean z = false;
        String sourceStreamId = tuple.getSourceStreamId();
        Object value = tuple.getValue(0);
        AckObject ackObject = this.pending.get(value);
        if (ACKER_INIT_STREAM_ID.equals(sourceStreamId)) {
            if (ackObject == null) {
                ackObject = new AckObject();
                this.pending.put(value, ackObject);
            }
            ackObject.updateAck(tuple.getLong(1));
            ackObject.spoutTask = tuple.getInteger(2).intValue();
        } else if (ACKER_ACK_STREAM_ID.equals(sourceStreamId)) {
            if (ackObject == null) {
                ackObject = new AckObject();
                this.pending.put(value, ackObject);
            }
            ackObject.updateAck(tuple.getLong(1));
        } else if (ACKER_FAIL_STREAM_ID.equals(sourceStreamId)) {
            if (ackObject == null) {
                ackObject = new AckObject();
            }
            ackObject.failed = true;
            this.pending.put(value, ackObject);
        } else {
            if (!ACKER_RESET_TIMEOUT_STREAM_ID.equals(sourceStreamId)) {
                if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(sourceStreamId)) {
                    this.collector.flush();
                    return;
                } else {
                    LOG.warn("Unknown source stream {} from task-{}", sourceStreamId, Integer.valueOf(tuple.getSourceTask()));
                    return;
                }
            }
            z = true;
            if (ackObject == null) {
                ackObject = new AckObject();
            }
            this.pending.put(value, ackObject);
        }
        int i = ackObject.spoutTask;
        if (i >= 0 && (ackObject.val == 0 || ackObject.failed || z)) {
            Values values = new Values(value, Long.valueOf(getTimeDeltaMillis(ackObject.startTime)));
            if (ackObject.val == 0) {
                this.pending.remove(value);
                this.collector.emitDirect(i, ACKER_ACK_STREAM_ID, values);
            } else if (ackObject.failed) {
                this.pending.remove(value);
                this.collector.emitDirect(i, ACKER_FAIL_STREAM_ID, values);
            } else {
                if (!z) {
                    throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
                }
                this.collector.emitDirect(i, ACKER_RESET_TIMEOUT_STREAM_ID, values);
            }
        }
        this.collector.ack(tuple);
    }

    @Override // org.apache.storm.task.IBolt
    public void cleanup() {
        LOG.info("Acker: cleanup successfully");
    }

    private long getTimeDeltaMillis(long j) {
        return Time.currentTimeMillis() - j;
    }
}
