package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.snapshot.SerializableWrapper;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.time.ClockUtil;
import java.util.HashMap;
import java.util.Iterator;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/task/processor/TaskExpireLockStreamProcessor.class */
public class TaskExpireLockStreamProcessor implements StreamProcessor {
    protected static final int INDEX_VALUE_LENGTH = 8;
    protected DeferredCommandContext cmdQueue;
    protected LogStreamReader targetLogStreamReader;
    protected LogStreamWriter targetLogStreamWriter;
    protected LogStream targetStream;
    protected DirectBuffer targetLogStreamTopicName;
    protected int targetLogStreamPartitionId;
    protected int streamProcessorId;
    protected final EventProcessor lockedEventProcessor = new LockedEventProcessor();
    protected final EventProcessor unlockEventProcessor = new UnlockEventProcessor();
    protected final EventProcessor expireLockEventProcessor = new ExpireLockEventProcessor();
    protected final Runnable checkLockExpirationCmd = new CheckLockExpirationCmd();
    protected HashMap<Long, ExpirationTimeBucket> index = new HashMap<>();
    protected SerializableWrapper<HashMap<Long, ExpirationTimeBucket>> indexSnapshot = new SerializableWrapper<>(this.index);
    protected final BrokerEventMetadata targetEventMetadata = new BrokerEventMetadata();
    protected final TaskEvent taskEvent = new TaskEvent();
    protected long eventKey = 0;
    protected long eventPosition = 0;
    protected long lastWrittenEventPosition = 0;

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskExpireLockStreamProcessor$CheckLockExpirationCmd.class */
    class CheckLockExpirationCmd implements Runnable {
        CheckLockExpirationCmd() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (TaskExpireLockStreamProcessor.this.index.size() > 0) {
                Iterator<Long> it = TaskExpireLockStreamProcessor.this.index.keySet().iterator();
                while (it.hasNext()) {
                    long longValue = it.next().longValue();
                    ExpirationTimeBucket expirationTimeBucket = TaskExpireLockStreamProcessor.this.index.get(Long.valueOf(longValue));
                    long eventPosition = expirationTimeBucket.getEventPosition();
                    if (lockExpired(expirationTimeBucket.getExpirationTime())) {
                        writeLockExpireEvent(longValue, findEvent(eventPosition));
                        it.remove();
                    }
                }
            }
        }

        protected boolean lockExpired(long j) {
            return j <= ClockUtil.getCurrentTimeInMillis();
        }

        protected LoggedEvent findEvent(long j) {
            if (TaskExpireLockStreamProcessor.this.targetLogStreamReader.seek(j) && TaskExpireLockStreamProcessor.this.targetLogStreamReader.hasNext()) {
                return TaskExpireLockStreamProcessor.this.targetLogStreamReader.next();
            }
            throw new IllegalStateException("Failed to check the task lock expiration time. Indexed task event not found in log stream.");
        }

        protected void writeLockExpireEvent(long j, LoggedEvent loggedEvent) {
            TaskExpireLockStreamProcessor.this.taskEvent.reset();
            loggedEvent.readValue(TaskExpireLockStreamProcessor.this.taskEvent);
            TaskExpireLockStreamProcessor.this.taskEvent.setState(TaskState.EXPIRE_LOCK);
            TaskExpireLockStreamProcessor.this.targetEventMetadata.reset().protocolVersion(1).eventType(EventType.TASK_EVENT).raftTermId(TaskExpireLockStreamProcessor.this.targetStream.getTerm());
            long tryWrite = TaskExpireLockStreamProcessor.this.targetLogStreamWriter.producerId(TaskExpireLockStreamProcessor.this.streamProcessorId).sourceEvent(TaskExpireLockStreamProcessor.this.targetLogStreamTopicName, TaskExpireLockStreamProcessor.this.targetLogStreamPartitionId, loggedEvent.getPosition()).key(j).metadataWriter(TaskExpireLockStreamProcessor.this.targetEventMetadata).valueWriter(TaskExpireLockStreamProcessor.this.taskEvent).tryWrite();
            if (tryWrite >= 0) {
                TaskExpireLockStreamProcessor.this.lastWrittenEventPosition = tryWrite;
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskExpireLockStreamProcessor$ExpireLockEventProcessor.class */
    class ExpireLockEventProcessor implements EventProcessor {
        ExpireLockEventProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskExpireLockStreamProcessor.this.lastWrittenEventPosition;
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskExpireLockStreamProcessor$LockedEventProcessor.class */
    class LockedEventProcessor implements EventProcessor {
        LockedEventProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            TaskExpireLockStreamProcessor.this.index.put(Long.valueOf(TaskExpireLockStreamProcessor.this.eventKey), new ExpirationTimeBucket(TaskExpireLockStreamProcessor.this.eventPosition, TaskExpireLockStreamProcessor.this.taskEvent.getLockTime()));
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskExpireLockStreamProcessor$UnlockEventProcessor.class */
    class UnlockEventProcessor implements EventProcessor {
        UnlockEventProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            TaskExpireLockStreamProcessor.this.index.remove(Long.valueOf(TaskExpireLockStreamProcessor.this.eventKey));
        }
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public SnapshotSupport getStateResource() {
        return this.indexSnapshot;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public void onOpen(StreamProcessorContext streamProcessorContext) {
        this.streamProcessorId = streamProcessorContext.getId();
        this.cmdQueue = streamProcessorContext.getStreamProcessorCmdQueue();
        this.targetLogStreamReader = streamProcessorContext.getTargetLogStreamReader();
        this.targetLogStreamWriter = streamProcessorContext.getLogStreamWriter();
        this.targetStream = streamProcessorContext.getTargetStream();
        this.targetLogStreamTopicName = this.targetStream.getTopicName();
        this.targetLogStreamPartitionId = this.targetStream.getPartitionId();
        this.index = this.indexSnapshot.getObject();
    }

    public static MetadataFilter eventFilter() {
        return brokerEventMetadata -> {
            return brokerEventMetadata.getEventType() == EventType.TASK_EVENT;
        };
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        this.eventKey = loggedEvent.getKey();
        this.eventPosition = loggedEvent.getPosition();
        this.taskEvent.reset();
        loggedEvent.readValue(this.taskEvent);
        EventProcessor eventProcessor = null;
        switch (this.taskEvent.getState()) {
            case LOCKED:
                eventProcessor = this.lockedEventProcessor;
                break;
            case EXPIRE_LOCK:
                eventProcessor = this.expireLockEventProcessor;
                break;
            case LOCK_EXPIRED:
            case COMPLETED:
            case FAILED:
                eventProcessor = this.unlockEventProcessor;
                break;
        }
        return eventProcessor;
    }

    public void checkLockExpirationAsync() {
        this.cmdQueue.runAsync(this.checkLockExpirationCmd);
    }
}
