package org.apache.storm.druid.bolt;

import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.util.Map;
import org.apache.storm.druid.bolt.DruidConfig;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/druid/bolt/DruidBeamBolt.class */
public class DruidBeamBolt<E> extends BaseTickTupleAwareRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    private volatile OutputCollector collector;
    private DruidBeamFactory<E> beamFactory;
    private DruidConfig druidConfig;
    private Tranquilizer<E> tranquilizer = null;
    private ITupleDruidEventMapper<E> druidEventMapper;

    public DruidBeamBolt(DruidBeamFactory<E> druidBeamFactory, ITupleDruidEventMapper<E> iTupleDruidEventMapper, DruidConfig.Builder builder) {
        this.beamFactory = null;
        this.druidConfig = null;
        this.druidEventMapper = null;
        this.beamFactory = druidBeamFactory;
        this.druidConfig = builder.build();
        this.druidEventMapper = iTupleDruidEventMapper;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.tranquilizer = Tranquilizer.builder().maxBatchSize(this.druidConfig.getMaxBatchSize()).maxPendingBatches(this.druidConfig.getMaxPendingBatches()).lingerMillis(this.druidConfig.getLingerMillis()).blockOnFull(this.druidConfig.isBlockOnFull()).build(this.beamFactory.makeBeam(map, topologyContext));
        this.tranquilizer.start();
    }

    protected void process(final Tuple tuple) {
        final E event = this.druidEventMapper.getEvent(tuple);
        Future send = this.tranquilizer.send(event);
        LOG.debug("Sent tuple : [{}]", event);
        send.addEventListener(new FutureEventListener() { // from class: org.apache.storm.druid.bolt.DruidBeamBolt.1
            public void onFailure(Throwable th) {
                if (!(th instanceof MessageDroppedException)) {
                    DruidBeamBolt.this.collector.fail(tuple);
                    DruidBeamBolt.LOG.error("Tuple Processing Failed : [{}]", event, th);
                    return;
                }
                DruidBeamBolt.this.collector.ack(tuple);
                DruidBeamBolt.LOG.debug("Tuple Dropped due to MessageDroppedException {} : [{}]", th.getMessage(), event);
                if (DruidBeamBolt.this.druidConfig.getDiscardStreamId() != null) {
                    DruidBeamBolt.this.collector.emit(DruidBeamBolt.this.druidConfig.getDiscardStreamId(), new Values(new Object[]{tuple, Long.valueOf(System.currentTimeMillis())}));
                }
            }

            public void onSuccess(Object obj) {
                DruidBeamBolt.this.collector.ack(tuple);
                DruidBeamBolt.LOG.debug("Tuple Processing Success : [{}]", event);
            }
        });
    }

    public void cleanup() {
        this.tranquilizer.stop();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(this.druidConfig.getDiscardStreamId(), new Fields(new String[]{"tuple", "timestamp"}));
    }
}
