package org.apache.storm.topology;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.State;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.WindowLifecycleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/topology/StatefulWindowedBoltExecutor.class */
public class StatefulWindowedBoltExecutor<T extends State> extends WindowedBoltExecutor implements IStatefulBolt<T> {
    private static final Logger LOG = LoggerFactory.getLogger(StatefulWindowedBoltExecutor.class);
    private final IStatefulWindowedBolt<T> statefulWindowedBolt;
    private transient String msgIdFieldName;
    private transient TopologyContext topologyContext;
    private transient OutputCollector outputCollector;
    private transient KeyValueState<TaskStream, WindowState> streamState;
    private transient List<Tuple> pendingTuples;
    private transient Map<TaskStream, WindowState> recoveryStates;
    private transient boolean stateInitialized;
    private transient boolean prePrepared;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/topology/StatefulWindowedBoltExecutor$TaskStream.class */
    public static class TaskStream {
        private int sourceTask;
        private GlobalStreamId streamId;

        TaskStream() {
        }

        TaskStream(int i, GlobalStreamId globalStreamId) {
            this.sourceTask = i;
            this.streamId = globalStreamId;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TaskStream taskStream = (TaskStream) obj;
            if (this.sourceTask != taskStream.sourceTask) {
                return false;
            }
            return this.streamId != null ? this.streamId.equals(taskStream.streamId) : taskStream.streamId == null;
        }

        public int hashCode() {
            return (31 * (this.streamId != null ? this.streamId.hashCode() : 0)) + this.sourceTask;
        }

        public String toString() {
            return "TaskStream{sourceTask=" + this.sourceTask + ", streamId=" + this.streamId + '}';
        }

        static TaskStream fromTuple(Tuple tuple) {
            return new TaskStream(tuple.getSourceTask(), tuple.getSourceGlobalStreamId());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/topology/StatefulWindowedBoltExecutor$WindowState.class */
    public static class WindowState {
        private long lastExpired;
        private long lastEvaluated;

        WindowState() {
        }

        WindowState(long j, long j2) {
            this.lastExpired = j;
            this.lastEvaluated = j2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            WindowState windowState = (WindowState) obj;
            return this.lastExpired == windowState.lastExpired && this.lastEvaluated == windowState.lastEvaluated;
        }

        public int hashCode() {
            return (31 * ((int) (this.lastExpired ^ (this.lastExpired >>> 32)))) + ((int) (this.lastEvaluated ^ (this.lastEvaluated >>> 32)));
        }

        public String toString() {
            return "WindowState{lastExpired=" + this.lastExpired + ", lastEvaluated=" + this.lastEvaluated + '}';
        }
    }

    public StatefulWindowedBoltExecutor(IStatefulWindowedBolt<T> iStatefulWindowedBolt) {
        super(iStatefulWindowedBolt);
        this.statefulWindowedBolt = iStatefulWindowedBolt;
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor, org.apache.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        prepare(map, topologyContext, outputCollector, getWindowState(map, topologyContext));
    }

    void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector, KeyValueState<TaskStream, WindowState> keyValueState) {
        init(map, topologyContext, outputCollector, keyValueState);
        super.prepare(map, topologyContext, outputCollector);
    }

    private void init(Map map, TopologyContext topologyContext, OutputCollector outputCollector, KeyValueState<TaskStream, WindowState> keyValueState) {
        if (!map.containsKey(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME)) {
            throw new IllegalArgumentException("topology.bolts.message.id.field.name is not set");
        }
        this.msgIdFieldName = (String) map.get(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
        this.topologyContext = topologyContext;
        this.outputCollector = outputCollector;
        this.streamState = keyValueState;
        this.pendingTuples = new ArrayList();
        this.recoveryStates = new HashMap();
        this.stateInitialized = false;
        this.prePrepared = false;
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor, org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (!isStateInitialized()) {
            throw new IllegalStateException("execute invoked before initState with input tuple " + tuple);
        }
        if (isRecovering()) {
            handleRecovery(tuple);
        } else {
            super.execute(tuple);
        }
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor
    protected void start() {
        if (!isStateInitialized() || isRecovering()) {
            LOG.debug("Will invoke start after recovery is complete.");
        } else {
            super.start();
        }
    }

    private void handleRecovery(Tuple tuple) {
        long msgId = getMsgId(tuple);
        TaskStream fromTuple = TaskStream.fromTuple(tuple);
        WindowState windowState = this.recoveryStates.get(fromTuple);
        LOG.debug("handleRecovery, recoveryStates {}", this.recoveryStates);
        if (windowState == null) {
            this.pendingTuples.add(tuple);
            return;
        }
        LOG.debug("Tuple msgid {}, saved state {}", Long.valueOf(msgId), windowState);
        if (msgId <= windowState.lastExpired) {
            LOG.debug("Ignoring tuple since msg id {} <= lastExpired id {}", Long.valueOf(msgId), Long.valueOf(windowState.lastExpired));
            this.outputCollector.ack(tuple);
        } else {
            if (msgId <= windowState.lastEvaluated) {
                super.execute(tuple);
                return;
            }
            LOG.debug("Tuple msg id {} > lastEvaluated id {}, adding to pendingTuples and clearing recovery state for taskStream {}", new Object[]{Long.valueOf(msgId), Long.valueOf(windowState.lastEvaluated), fromTuple});
            this.pendingTuples.add(tuple);
            clearRecoveryState(fromTuple);
        }
    }

    private void clearRecoveryState(TaskStream taskStream) {
        this.recoveryStates.remove(taskStream);
        if (isRecovering()) {
            return;
        }
        super.start();
        LOG.debug("Recovery complete, processing {} pending tuples", Integer.valueOf(this.pendingTuples.size()));
        Iterator<Tuple> it = this.pendingTuples.iterator();
        while (it.hasNext()) {
            super.execute(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isRecovering() {
        return !this.recoveryStates.isEmpty();
    }

    private boolean isStateInitialized() {
        return this.stateInitialized;
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void initState(T t) {
        if (this.stateInitialized) {
            LOG.warn("State is already initialized. Ignoring initState");
            return;
        }
        this.statefulWindowedBolt.initState(t);
        for (GlobalStreamId globalStreamId : this.topologyContext.getThisSources().keySet()) {
            Iterator<Integer> it = this.topologyContext.getComponentTasks(globalStreamId.get_componentId()).iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                WindowState windowState = this.streamState.get(new TaskStream(intValue, globalStreamId));
                if (windowState != null) {
                    this.recoveryStates.put(new TaskStream(intValue, globalStreamId), windowState);
                }
            }
        }
        LOG.debug("recoveryStates {}", this.recoveryStates);
        this.stateInitialized = true;
        start();
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void preCommit(long j) {
        if (isStateInitialized() && (isRecovering() || !this.prePrepared)) {
            LOG.debug("Still recovering, ignoring preCommit and not committing streamState.");
        } else {
            LOG.debug("Commit streamState, txid {}", Long.valueOf(j));
            this.streamState.commit(j);
        }
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void prePrepare(long j) {
        if (!isStateInitialized()) {
            LOG.warn("Cannot prepare before initState");
        } else {
            if (isRecovering()) {
                LOG.debug("Still recovering, ignoring prePrepare and not preparing streamState.");
                return;
            }
            LOG.debug("Prepare streamState, txid {}", Long.valueOf(j));
            this.streamState.prepareCommit(j);
            this.prePrepared = true;
        }
    }

    @Override // org.apache.storm.topology.IStatefulComponent
    public void preRollback() {
        LOG.debug("Rollback streamState, stateInitialized {}", Boolean.valueOf(this.stateInitialized));
        this.streamState.rollback();
    }

    @Override // org.apache.storm.topology.WindowedBoltExecutor
    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        final WindowLifecycleListener<Tuple> newWindowLifecycleListener = super.newWindowLifecycleListener();
        return new WindowLifecycleListener<Tuple>() { // from class: org.apache.storm.topology.StatefulWindowedBoltExecutor.1
            @Override // org.apache.storm.windowing.WindowLifecycleListener
            public void onExpiry(List<Tuple> list) {
                newWindowLifecycleListener.onExpiry(list);
            }

            @Override // org.apache.storm.windowing.WindowLifecycleListener
            public void onActivation(List<Tuple> list, List<Tuple> list2, List<Tuple> list3) {
                if (StatefulWindowedBoltExecutor.this.isRecovering()) {
                    String format = String.format("Unexpected activation with events %s, newEvents %s, expired %s in recovering state. recoveryStates %s ", list, list2, list3, StatefulWindowedBoltExecutor.this.recoveryStates);
                    StatefulWindowedBoltExecutor.LOG.error(format);
                    throw new IllegalStateException(format);
                }
                newWindowLifecycleListener.onActivation(list, list2, list3);
                StatefulWindowedBoltExecutor.this.updateWindowState(list3, list2);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateWindowState(List<Tuple> list, List<Tuple> list2) {
        LOG.debug("Update window state, {} expired, {} new events", Integer.valueOf(list.size()), Integer.valueOf(list2.size()));
        HashMap hashMap = new HashMap();
        updateState(hashMap, list, false);
        updateState(hashMap, list2, true);
        updateStreamState(hashMap);
    }

    private void updateState(Map<TaskStream, WindowState> map, List<Tuple> list, boolean z) {
        for (Tuple tuple : list) {
            TaskStream fromTuple = TaskStream.fromTuple(tuple);
            WindowState updatedState = getUpdatedState(map.get(fromTuple), getMsgId(tuple), z);
            if (updatedState != null) {
                map.put(fromTuple, updatedState);
            }
        }
    }

    private void updateStreamState(Map<TaskStream, WindowState> map) {
        for (Map.Entry<TaskStream, WindowState> entry : map.entrySet()) {
            TaskStream key = entry.getKey();
            WindowState value = entry.getValue();
            WindowState windowState = this.streamState.get(key);
            if (windowState == null) {
                this.streamState.put(key, value);
            } else {
                WindowState windowState2 = new WindowState(Math.max(value.lastExpired, windowState.lastExpired), Math.max(value.lastEvaluated, windowState.lastEvaluated));
                LOG.debug("Update window state, taskStream {}, curState {}, newState {}", new Object[]{key, windowState, windowState2});
                this.streamState.put(key, windowState2);
            }
        }
    }

    private WindowState getUpdatedState(WindowState windowState, long j, boolean z) {
        WindowState windowState2 = null;
        if (z) {
            if (windowState == null) {
                windowState2 = new WindowState(Long.MIN_VALUE, j);
            } else if (j > windowState.lastEvaluated) {
                windowState2 = new WindowState(windowState.lastExpired, j);
            }
        } else if (windowState == null) {
            windowState2 = new WindowState(j, Long.MIN_VALUE);
        } else if (j > windowState.lastExpired) {
            windowState2 = new WindowState(j, windowState.lastEvaluated);
        }
        return windowState2;
    }

    private long getMsgId(Tuple tuple) {
        return tuple.getLongByField(this.msgIdFieldName).longValue();
    }

    private KeyValueState<TaskStream, WindowState> getWindowState(Map map, TopologyContext topologyContext) {
        return (KeyValueState) StateFactory.getState(topologyContext.getThisComponentId() + "-" + topologyContext.getThisTaskId() + "-window", map, topologyContext);
    }
}
