package org.apache.storm.trident.spout;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.FailedException;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.ICommitterTridentSpout;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.topology.BatchInfo;
import org.apache.storm.trident.topology.ITridentBatchBolt;
import org.apache.storm.trident.topology.MasterBatchCoordinator;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/trident/spout/TridentSpoutExecutor.class */
public class TridentSpoutExecutor implements ITridentBatchBolt {
    public static final String ID_FIELD = "$tx";
    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
    AddIdCollector _collector;
    ITridentSpout<Object> _spout;
    ITridentSpout.Emitter<Object> _emitter;
    String _streamName;
    String _txStateId;
    TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();

    /* loaded from: input_file:org/apache/storm/trident/spout/TridentSpoutExecutor$AddIdCollector.class */
    private static class AddIdCollector implements TridentCollector {
        BatchOutputCollector _delegate;
        Object _id;
        String _stream;

        public AddIdCollector(String str, BatchOutputCollector batchOutputCollector) {
            this._delegate = batchOutputCollector;
            this._stream = str;
        }

        public void setBatch(Object obj) {
            this._id = obj;
        }

        @Override // org.apache.storm.trident.operation.TridentCollector
        public void emit(List<Object> list) {
            this._delegate.emit(this._stream, new ConsList(this._id, list));
        }

        @Override // org.apache.storm.trident.operation.TridentCollector
        public void reportError(Throwable th) {
            this._delegate.reportError(th);
        }
    }

    public TridentSpoutExecutor(String str, String str2, ITridentSpout<Object> iTridentSpout) {
        this._txStateId = str;
        this._spout = iTridentSpout;
        this._streamName = str2;
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector) {
        this._emitter = this._spout.getEmitter(this._txStateId, map, topologyContext);
        this._collector = new AddIdCollector(this._streamName, batchOutputCollector);
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (tuple.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if (!transactionAttempt.equals(this._activeBatches.get(transactionAttempt.getTransactionId()))) {
                throw new FailedException("Received commit for different transaction attempt");
            }
            ((ICommitterTridentSpout.Emitter) this._emitter).commit(transactionAttempt);
            this._activeBatches.remove(transactionAttempt.getTransactionId());
            return;
        }
        if (tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            this._activeBatches.headMap(transactionAttempt.getTransactionId()).clear();
            this._emitter.success(transactionAttempt);
        } else {
            this._collector.setBatch(batchInfo.batchId);
            this._emitter.emitBatch(transactionAttempt, tuple.getValue(1), this._collector);
            this._activeBatches.put(transactionAttempt.getTransactionId(), transactionAttempt);
        }
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void cleanup() {
        this._emitter.close();
    }

    @Override // org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        ArrayList arrayList = new ArrayList(this._spout.getOutputFields().toList());
        arrayList.add(0, ID_FIELD);
        outputFieldsDeclarer.declareStream(this._streamName, new Fields(arrayList));
    }

    @Override // org.apache.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void finishBatch(BatchInfo batchInfo) {
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public Object initBatchState(String str, Object obj) {
        return null;
    }
}
