package org.apache.storm.topology;

import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.State;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.DefaultEvictionContext;
import org.apache.storm.windowing.EventImpl;
import org.apache.storm.windowing.WindowLifecycleListener;
import org.apache.storm.windowing.persistence.WindowState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/topology/PersistentWindowedBoltExecutor.class */
public class PersistentWindowedBoltExecutor<T extends State> extends WindowedBoltExecutor implements IStatefulBolt<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
    private final IStatefulWindowedBolt<T> statefulWindowedBolt;
    private transient OutputCollector outputCollector;
    private transient WindowState<Tuple> state;
    private transient boolean stateInitialized;
    private transient boolean prePrepared;
    private transient KeyValueState<String, Optional<?>> windowSystemState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/topology/PersistentWindowedBoltExecutor$NoAckOutputCollector.class */
    public static class NoAckOutputCollector extends OutputCollector {
        public NoAckOutputCollector(OutputCollector outputCollector) {
            super(outputCollector);
        }

        @Override // org.apache.storm.task.OutputCollector, org.apache.storm.task.IOutputCollector
        public void ack(Tuple tuple) {
        }
    }

    public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> iStatefulWindowedBolt) {
        super(iStatefulWindowedBolt);
        this.statefulWindowedBolt = iStatefulWindowedBolt;
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor, org.apache.storm.task.IBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        List list = (List) map.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList());
        list.add(ConcurrentLinkedQueue.class.getName());
        list.add(LinkedList.class.getName());
        list.add(AtomicInteger.class.getName());
        list.add(EventImpl.class.getName());
        list.add(WindowState.WindowPartition.class.getName());
        list.add(DefaultEvictionContext.class.getName());
        map.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, list);
        prepare(map, topologyContext, outputCollector, getWindowState(map, topologyContext), getPartitionState(map, topologyContext), getWindowSystemState(map, topologyContext));
    }

    void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector, KeyValueState<Long, WindowState.WindowPartition<Tuple>> keyValueState, KeyValueState<String, Deque<Long>> keyValueState2, KeyValueState<String, Optional<?>> keyValueState3) {
        this.outputCollector = outputCollector;
        this.windowSystemState = keyValueState3;
        this.state = new WindowState<>(keyValueState, keyValueState2, keyValueState3, this::getState, this.statefulWindowedBolt.maxEventsInMemory());
        doPrepare(map, topologyContext, new NoAckOutputCollector(outputCollector), this.state, true);
        restoreWindowSystemState();
    }

    private void restoreWindowSystemState() {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<K, V>> it = this.windowSystemState.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            hashMap.put(entry.getKey(), entry.getValue());
        }
        restoreState(hashMap);
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor
    protected void validate(Map<String, Object> map, BaseWindowedBolt.Count count, BaseWindowedBolt.Duration duration, BaseWindowedBolt.Count count2, BaseWindowedBolt.Duration duration2) {
        if (count == null && duration == null) {
            throw new IllegalArgumentException("Window length is not specified");
        }
        int checkpointIntervalMillis = getCheckpointIntervalMillis(map);
        int topologyTimeoutMillis = getTopologyTimeoutMillis(map);
        if (checkpointIntervalMillis > topologyTimeoutMillis) {
            throw new IllegalArgumentException(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL + checkpointIntervalMillis + " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + " value " + topologyTimeoutMillis);
        }
    }

    private int getCheckpointIntervalMillis(Map<String, Object> map) {
        int i = Integer.MAX_VALUE;
        if (map.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL) != null) {
            i = ((Number) map.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.storm.topology.WindowedBoltExecutor
    public void start() {
        if (this.stateInitialized) {
            super.start();
        } else {
            LOG.debug("Will invoke start after state is initialized");
        }
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor, org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (!this.stateInitialized) {
            throw new IllegalStateException("execute invoked before initState with input tuple " + tuple);
        }
        super.execute(tuple);
        this.outputCollector.ack(tuple);
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void initState(T t) {
        if (this.stateInitialized) {
            LOG.warn("initState invoked when the state is already initialized");
            throw new IllegalStateException("initState invoked when the state is already initialized");
        }
        this.statefulWindowedBolt.initState(t);
        this.stateInitialized = true;
        start();
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void prePrepare(long j) {
        if (!this.stateInitialized) {
            LOG.warn("Cannot prepare before initState");
            throw new IllegalStateException("Cannot prepare before initState");
        }
        LOG.debug("Prepare streamState, txid {}", Long.valueOf(j));
        this.statefulWindowedBolt.prePrepare(j);
        this.state.prepareCommit(j);
        this.prePrepared = true;
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void preCommit(long j) {
        if (!this.prePrepared && this.stateInitialized) {
            LOG.warn("preCommit before prePrepare in initialized state");
            throw new IllegalStateException("preCommit before prePrepare in initialized state");
        }
        LOG.debug("Commit streamState, txid {}", Long.valueOf(j));
        this.statefulWindowedBolt.preCommit(j);
        this.state.commit(j);
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void preRollback() {
        LOG.debug("Rollback streamState, stateInitialized {}", Boolean.valueOf(this.stateInitialized));
        this.statefulWindowedBolt.preRollback();
        this.state.rollback(this.stateInitialized);
        if (this.stateInitialized) {
            restoreWindowSystemState();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.storm.topology.WindowedBoltExecutor
    public WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>() { // from class: org.apache.storm.topology.PersistentWindowedBoltExecutor.1
            @Override // org.apache.storm.windowing.WindowLifecycleListener
            public void onExpiry(List<Tuple> list) {
            }

            @Override // org.apache.storm.windowing.WindowLifecycleListener
            public void onActivation(Supplier<Iterator<Tuple>> supplier, Supplier<Iterator<Tuple>> supplier2, Supplier<Iterator<Tuple>> supplier3, Long l) {
                PersistentWindowedBoltExecutor.this.boltExecute(supplier, supplier2, supplier3, l);
                PersistentWindowedBoltExecutor.this.state.clearIteratorPins();
            }
        };
    }

    private KeyValueState<Long, WindowState.WindowPartition<Tuple>> getWindowState(Map<String, Object> map, TopologyContext topologyContext) {
        return (KeyValueState) StateFactory.getState(topologyContext.getThisComponentId() + "-" + topologyContext.getThisTaskId() + "-window", map, topologyContext);
    }

    private KeyValueState<String, Deque<Long>> getPartitionState(Map<String, Object> map, TopologyContext topologyContext) {
        return (KeyValueState) StateFactory.getState(topologyContext.getThisComponentId() + "-" + topologyContext.getThisTaskId() + "-window-partitions", map, topologyContext);
    }

    private KeyValueState<String, Optional<?>> getWindowSystemState(Map<String, Object> map, TopologyContext topologyContext) {
        return (KeyValueState) StateFactory.getState(topologyContext.getThisComponentId() + "-" + topologyContext.getThisTaskId() + "-window-systemstate", map, topologyContext);
    }
}
