package org.apache.storm.executor.bolt;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.ExecutorTransfer;
import org.apache.storm.hooks.info.BoltAckInfo;
import org.apache.storm.hooks.info.BoltFailInfo;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/executor/bolt/BoltOutputCollectorImpl.class */
public class BoltOutputCollectorImpl implements IOutputCollector {
    private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
    private final BoltExecutor executor;
    private final Task task;
    private final int taskId;
    private final Random random;
    private final boolean isEventLoggers;
    private final ExecutorTransfer xsfer;
    private final boolean isDebug;
    private boolean ackingEnabled;

    public BoltOutputCollectorImpl(BoltExecutor boltExecutor, Task task, Random random, boolean z, boolean z2, boolean z3) {
        this.executor = boltExecutor;
        this.task = task;
        this.taskId = task.getTaskId().intValue();
        this.random = random;
        this.isEventLoggers = z;
        this.ackingEnabled = z2;
        this.isDebug = z3;
        this.xsfer = boltExecutor.getExecutorTransfer();
    }

    @Override // org.apache.storm.task.IOutputCollector
    public List<Integer> emit(String str, Collection<Tuple> collection, List<Object> list) {
        try {
            return boltEmit(str, collection, list, null);
        } catch (InterruptedException e) {
            LOG.warn("Thread interrupted when emiting tuple.");
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.task.IOutputCollector
    public void emitDirect(int i, String str, Collection<Tuple> collection, List<Object> list) {
        try {
            boltEmit(str, collection, list, Integer.valueOf(i));
        } catch (InterruptedException e) {
            LOG.warn("Thread interrupted when emiting tuple.");
            throw new RuntimeException(e);
        }
    }

    private List<Integer> boltEmit(String str, Collection<Tuple> collection, List<Object> list, Integer num) throws InterruptedException {
        MessageId makeUnanchored;
        List<Integer> outgoingTasks = num != null ? this.task.getOutgoingTasks(num, str, list) : this.task.getOutgoingTasks(str, list);
        for (int i = 0; i < outgoingTasks.size(); i++) {
            Integer num2 = outgoingTasks.get(i);
            if (!this.ackingEnabled || collection == null) {
                makeUnanchored = MessageId.makeUnanchored();
            } else {
                HashMap hashMap = new HashMap();
                for (Tuple tuple : collection) {
                    Set<Long> keySet = tuple.getMessageId().getAnchorsToIds().keySet();
                    if (keySet.size() > 0) {
                        long generateId = MessageId.generateId(this.random);
                        ((TupleImpl) tuple).updateAckVal(generateId);
                        Iterator<Long> it = keySet.iterator();
                        while (it.hasNext()) {
                            putXor(hashMap, it.next(), Long.valueOf(generateId));
                        }
                    }
                }
                makeUnanchored = MessageId.makeId(hashMap);
            }
            this.xsfer.tryTransfer(new AddressedTuple(num2.intValue(), new TupleImpl(this.executor.getWorkerTopologyContext(), list, this.executor.getComponentId(), this.taskId, str, makeUnanchored)), this.executor.getPendingEmits());
        }
        if (this.isEventLoggers) {
            this.task.sendToEventLogger(this.executor, list, this.executor.getComponentId(), null, this.random, this.executor.getPendingEmits());
        }
        return outgoingTasks;
    }

    @Override // org.apache.storm.task.IOutputCollector
    public void ack(Tuple tuple) {
        if (this.ackingEnabled) {
            long ackVal = ((TupleImpl) tuple).getAckVal();
            for (Map.Entry<Long, Long> entry : tuple.getMessageId().getAnchorsToIds().entrySet()) {
                this.task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID, new Values(entry.getKey(), Long.valueOf(Utils.bitXor(entry.getValue(), Long.valueOf(ackVal)))), this.executor.getExecutorTransfer(), this.executor.getPendingEmits());
            }
            long tupleTimeDelta = tupleTimeDelta((TupleImpl) tuple);
            if (this.isDebug) {
                LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", new Object[]{Integer.valueOf(this.taskId), Long.valueOf(tupleTimeDelta), tuple});
            }
            if (!this.task.getUserContext().getHooks().isEmpty()) {
                new BoltAckInfo(tuple, this.taskId, Long.valueOf(tupleTimeDelta)).applyOn(this.task.getUserContext());
            }
            if (tupleTimeDelta >= 0) {
                this.executor.getStats().boltAckedTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), tupleTimeDelta, this.task.getTaskMetrics().getAcked(tuple.getSourceStreamId()));
            }
        }
    }

    @Override // org.apache.storm.task.IOutputCollector
    public void fail(Tuple tuple) {
        if (this.ackingEnabled) {
            Iterator<Long> it = tuple.getMessageId().getAnchors().iterator();
            while (it.hasNext()) {
                this.task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, new Values(it.next()), this.executor.getExecutorTransfer(), this.executor.getPendingEmits());
            }
            long tupleTimeDelta = tupleTimeDelta((TupleImpl) tuple);
            if (this.isDebug) {
                LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", new Object[]{Integer.valueOf(this.taskId), Long.valueOf(tupleTimeDelta), tuple});
            }
            new BoltFailInfo(tuple, this.taskId, Long.valueOf(tupleTimeDelta)).applyOn(this.task.getUserContext());
            if (tupleTimeDelta >= 0) {
                this.executor.getStats().boltFailedTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), tupleTimeDelta, this.task.getTaskMetrics().getFailed(tuple.getSourceStreamId()));
            }
        }
    }

    @Override // org.apache.storm.task.IOutputCollector
    public void resetTimeout(Tuple tuple) {
        Iterator<Long> it = tuple.getMessageId().getAnchors().iterator();
        while (it.hasNext()) {
            this.task.sendUnanchored(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, new Values(it.next()), this.executor.getExecutorTransfer(), this.executor.getPendingEmits());
        }
    }

    @Override // org.apache.storm.task.IOutputCollector
    public void flush() {
        try {
            this.xsfer.flush();
        } catch (InterruptedException e) {
            LOG.warn("Bolt thread interrupted during flush()");
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.task.IErrorReporter
    public void reportError(Throwable th) {
        this.executor.getErrorReportingMetrics().incrReportedErrorCount();
        this.executor.getReportError().report(th);
    }

    private long tupleTimeDelta(TupleImpl tupleImpl) {
        Long processSampleStartTime = tupleImpl.getProcessSampleStartTime();
        if (processSampleStartTime != null) {
            return Time.deltaMs(processSampleStartTime.longValue());
        }
        return -1L;
    }

    private void putXor(Map<Long, Long> map, Long l, Long l2) {
        Long l3 = map.get(l);
        if (l3 == null) {
            l3 = 0L;
        }
        map.put(l, Long.valueOf(Utils.bitXor(l3, l2)));
    }
}
