package org.apache.storm.topology;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.CountEvictionPolicy;
import org.apache.storm.windowing.CountTriggerPolicy;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TimeEvictionPolicy;
import org.apache.storm.windowing.TimeTriggerPolicy;
import org.apache.storm.windowing.TriggerPolicy;
import org.apache.storm.windowing.TupleWindowImpl;
import org.apache.storm.windowing.WaterMarkEventGenerator;
import org.apache.storm.windowing.WatermarkCountEvictionPolicy;
import org.apache.storm.windowing.WatermarkCountTriggerPolicy;
import org.apache.storm.windowing.WatermarkTimeEvictionPolicy;
import org.apache.storm.windowing.WatermarkTimeTriggerPolicy;
import org.apache.storm.windowing.WindowLifecycleListener;
import org.apache.storm.windowing.WindowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/topology/WindowedBoltExecutor.class */
public class WindowedBoltExecutor implements IRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000;
    private static final int DEFAULT_MAX_LAG_MS = 0;
    private final IWindowedBolt bolt;
    private transient WindowedOutputCollector windowedOutputCollector;
    private transient WindowLifecycleListener<Tuple> listener;
    private transient WindowManager<Tuple> windowManager;
    private transient int maxLagMs;
    private transient String tupleTsFieldName;
    private transient TriggerPolicy<Tuple> triggerPolicy;
    private transient EvictionPolicy<Tuple> evictionPolicy;
    transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;

    /* loaded from: input_file:org/apache/storm/topology/WindowedBoltExecutor$WindowedOutputCollector.class */
    private static class WindowedOutputCollector extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector iOutputCollector) {
            super(iOutputCollector);
        }

        void setContext(List<Tuple> list) {
            this.inputTuples = list;
        }

        @Override // org.apache.storm.task.OutputCollector
        public List<Integer> emit(String str, List<Object> list) {
            return emit(str, this.inputTuples, list);
        }

        @Override // org.apache.storm.task.OutputCollector
        public void emitDirect(int i, String str, List<Object> list) {
            emitDirect(i, str, this.inputTuples, list);
        }
    }

    public WindowedBoltExecutor(IWindowedBolt iWindowedBolt) {
        this.bolt = iWindowedBolt;
    }

    private int getTopologyTimeoutMillis(Map map) {
        if (map.get("topology.enable.message.timeouts") != null && !((Boolean) map.get("topology.enable.message.timeouts")).booleanValue()) {
            return Integer.MAX_VALUE;
        }
        int i = 0;
        if (map.get("topology.message.timeout.secs") != null) {
            i = ((Number) map.get("topology.message.timeout.secs")).intValue();
        }
        return i * 1000;
    }

    private int getMaxSpoutPending(Map map) {
        int i = Integer.MAX_VALUE;
        if (map.get("topology.max.spout.pending") != null) {
            i = ((Number) map.get("topology.max.spout.pending")).intValue();
        }
        return i;
    }

    private void ensureDurationLessThanTimeout(int i, int i2) {
        if (i > i2) {
            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + i + " is more than topology.message.timeout.secs value " + i2);
        }
    }

    private void ensureCountLessThanMaxPending(int i, int i2) {
        if (i > i2) {
            throw new IllegalArgumentException("Window count (length + sliding interval) value " + i + " is more than topology.max.spout.pending value " + i2);
        }
    }

    private void validate(Map map, BaseWindowedBolt.Count count, BaseWindowedBolt.Duration duration, BaseWindowedBolt.Count count2, BaseWindowedBolt.Duration duration2) {
        int topologyTimeoutMillis = getTopologyTimeoutMillis(map);
        int maxSpoutPending = getMaxSpoutPending(map);
        if (count == null && duration == null) {
            throw new IllegalArgumentException("Window length is not specified");
        }
        if (duration != null && duration2 != null) {
            ensureDurationLessThanTimeout(duration.value + duration2.value, topologyTimeoutMillis);
        } else if (duration != null) {
            ensureDurationLessThanTimeout(duration.value, topologyTimeoutMillis);
        } else if (duration2 != null) {
            ensureDurationLessThanTimeout(duration2.value, topologyTimeoutMillis);
        }
        if (count != null && count2 != null) {
            ensureCountLessThanMaxPending(count.value + count2.value, maxSpoutPending);
        } else if (count != null) {
            ensureCountLessThanMaxPending(count.value, maxSpoutPending);
        } else if (count2 != null) {
            ensureCountLessThanMaxPending(count2.value, maxSpoutPending);
        }
    }

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> windowLifecycleListener, Map map, TopologyContext topologyContext) {
        WindowManager<Tuple> windowManager = new WindowManager<>(windowLifecycleListener);
        BaseWindowedBolt.Duration duration = null;
        BaseWindowedBolt.Count count = null;
        BaseWindowedBolt.Duration duration2 = null;
        BaseWindowedBolt.Count count2 = null;
        if (map.containsKey("topology.bolts.window.length.count")) {
            count = new BaseWindowedBolt.Count(((Number) map.get("topology.bolts.window.length.count")).intValue());
        } else if (map.containsKey("topology.bolts.window.length.duration.ms")) {
            duration = new BaseWindowedBolt.Duration(((Number) map.get("topology.bolts.window.length.duration.ms")).intValue(), TimeUnit.MILLISECONDS);
        }
        if (map.containsKey("topology.bolts.window.sliding.interval.count")) {
            count2 = new BaseWindowedBolt.Count(((Number) map.get("topology.bolts.window.sliding.interval.count")).intValue());
        } else if (map.containsKey("topology.bolts.window.sliding.interval.duration.ms")) {
            duration2 = new BaseWindowedBolt.Duration(((Number) map.get("topology.bolts.window.sliding.interval.duration.ms")).intValue(), TimeUnit.MILLISECONDS);
        } else {
            count2 = new BaseWindowedBolt.Count(1);
        }
        if (map.containsKey("topology.bolts.tuple.timestamp.field.name")) {
            this.tupleTsFieldName = (String) map.get("topology.bolts.tuple.timestamp.field.name");
            if (map.containsKey("topology.bolts.tuple.timestamp.max.lag.ms")) {
                this.maxLagMs = ((Number) map.get("topology.bolts.tuple.timestamp.max.lag.ms")).intValue();
            } else {
                this.maxLagMs = 0;
            }
            this.waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, map.containsKey("topology.bolts.watermark.event.interval.ms") ? ((Number) map.get("topology.bolts.watermark.event.interval.ms")).intValue() : 1000, this.maxLagMs, getComponentStreams(topologyContext));
        }
        validate(map, count, duration, count2, duration2);
        this.evictionPolicy = getEvictionPolicy(count, duration, windowManager);
        this.triggerPolicy = getTriggerPolicy(count2, duration2, windowManager, this.evictionPolicy);
        windowManager.setEvictionPolicy(this.evictionPolicy);
        windowManager.setTriggerPolicy(this.triggerPolicy);
        return windowManager;
    }

    private Set<GlobalStreamId> getComponentStreams(TopologyContext topologyContext) {
        HashSet hashSet = new HashSet();
        for (GlobalStreamId globalStreamId : topologyContext.getThisSources().keySet()) {
            if (!globalStreamId.get_streamId().equals(CheckpointSpout.CHECKPOINT_STREAM_ID)) {
                hashSet.add(globalStreamId);
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        if (this.waterMarkEventGenerator != null) {
            LOG.debug("Starting waterMarkEventGenerator");
            this.waterMarkEventGenerator.start();
        }
        LOG.debug("Starting trigger policy");
        this.triggerPolicy.start();
    }

    private boolean isTupleTs() {
        return this.tupleTsFieldName != null;
    }

    private TriggerPolicy<Tuple> getTriggerPolicy(BaseWindowedBolt.Count count, BaseWindowedBolt.Duration duration, WindowManager<Tuple> windowManager, EvictionPolicy<Tuple> evictionPolicy) {
        return count != null ? isTupleTs() ? new WatermarkCountTriggerPolicy(count.value, windowManager, evictionPolicy, windowManager) : new CountTriggerPolicy(count.value, windowManager, evictionPolicy) : isTupleTs() ? new WatermarkTimeTriggerPolicy(duration.value, windowManager, evictionPolicy, windowManager) : new TimeTriggerPolicy(duration.value, windowManager, evictionPolicy);
    }

    private EvictionPolicy<Tuple> getEvictionPolicy(BaseWindowedBolt.Count count, BaseWindowedBolt.Duration duration, WindowManager<Tuple> windowManager) {
        return count != null ? isTupleTs() ? new WatermarkCountEvictionPolicy(count.value, windowManager) : new CountEvictionPolicy(count.value) : isTupleTs() ? new WatermarkTimeEvictionPolicy(duration.value, this.maxLagMs) : new TimeEvictionPolicy(duration.value);
    }

    @Override // org.apache.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.windowedOutputCollector = new WindowedOutputCollector(outputCollector);
        this.bolt.prepare(map, topologyContext, this.windowedOutputCollector);
        this.listener = newWindowLifecycleListener();
        this.windowManager = initWindowManager(this.listener, map, topologyContext);
        start();
        LOG.debug("Initialized window manager {} ", this.windowManager);
    }

    @Override // org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (!isTupleTs()) {
            this.windowManager.add(tuple);
            return;
        }
        long longValue = tuple.getLongByField(this.tupleTsFieldName).longValue();
        if (this.waterMarkEventGenerator.track(tuple.getSourceGlobalStreamId(), longValue)) {
            this.windowManager.add(tuple, longValue);
        } else {
            LOG.info("Received a late tuple {} with ts {}. This will not processed.", tuple, Long.valueOf(longValue));
        }
    }

    @Override // org.apache.storm.task.IBolt
    public void cleanup() {
        this.windowManager.shutdown();
        this.bolt.cleanup();
    }

    @Override // org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this.bolt.declareOutputFields(outputFieldsDeclarer);
    }

    @Override // org.apache.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this.bolt.getComponentConfiguration();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>() { // from class: org.apache.storm.topology.WindowedBoltExecutor.1
            public void onExpiry(List<Tuple> list) {
                Iterator<Tuple> it = list.iterator();
                while (it.hasNext()) {
                    WindowedBoltExecutor.this.windowedOutputCollector.ack(it.next());
                }
            }

            public void onActivation(List<Tuple> list, List<Tuple> list2, List<Tuple> list3) {
                WindowedBoltExecutor.this.windowedOutputCollector.setContext(list);
                WindowedBoltExecutor.this.bolt.execute(new TupleWindowImpl(list, list2, list3));
            }
        };
    }
}
