package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.task.CreditsRequest;
import io.zeebe.broker.task.TaskSubscriptionManager;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.broker.task.map.TaskInstanceMap;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.broker.util.PayloadUtil;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor.class */
public class TaskInstanceStreamProcessor implements StreamProcessor {
    protected static final short STATE_CREATED = 1;
    protected static final short STATE_LOCKED = 2;
    protected static final short STATE_FAILED = 3;
    protected static final short STATE_LOCK_EXPIRED = 4;
    protected final CommandResponseWriter responseWriter;
    protected final SubscribedEventWriter subscribedEventWriter;
    protected final TaskSubscriptionManager taskSubscriptionManager;
    protected DirectBuffer logStreamTopicName;
    protected int logStreamPartitionId;
    protected LogStream targetStream;
    protected BrokerEventMetadata sourceEventMetadata = new BrokerEventMetadata();
    protected final BrokerEventMetadata targetEventMetadata = new BrokerEventMetadata();
    protected final CreateTaskProcessor createTaskProcessor = new CreateTaskProcessor();
    protected final LockTaskProcessor lockTaskProcessor = new LockTaskProcessor();
    protected final CompleteTaskProcessor completeTaskProcessor = new CompleteTaskProcessor();
    protected final FailTaskProcessor failTaskProcessor = new FailTaskProcessor();
    protected final ExpireLockTaskProcessor expireLockTaskProcessor = new ExpireLockTaskProcessor();
    protected final UpdateRetriesTaskProcessor updateRetriesTaskProcessor = new UpdateRetriesTaskProcessor();
    protected final CancelTaskProcessor cancelTaskProcessor = new CancelTaskProcessor();
    protected final TaskEvent taskEvent = new TaskEvent();
    protected final CreditsRequest creditsRequest = new CreditsRequest();
    protected long eventKey = 0;
    protected long eventPosition = 0;
    protected final TaskInstanceMap taskIndex = new TaskInstanceMap();

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$CancelTaskProcessor.class */
    private class CancelTaskProcessor implements EventProcessor {
        private boolean isCanceled;

        private CancelTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            this.isCanceled = false;
            if (TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey).getState() <= 0) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.CANCEL_REJECTED);
            } else {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.CANCELED);
                this.isCanceled = true;
            }
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            if (this.isCanceled) {
                TaskInstanceStreamProcessor.this.taskIndex.remove(TaskInstanceStreamProcessor.this.eventKey);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$CompleteTaskProcessor.class */
    private class CompleteTaskProcessor implements EventProcessor {
        protected boolean isCompleted;

        private CompleteTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            this.isCompleted = false;
            TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey);
            short state = TaskInstanceStreamProcessor.this.taskIndex.getState();
            TaskState taskState = TaskState.COMPLETE_REJECTED;
            if (state == 2 || state == 4) {
                DirectBuffer payload = TaskInstanceStreamProcessor.this.taskEvent.getPayload();
                if ((PayloadUtil.isNilPayload(payload) || PayloadUtil.isValidPayload(payload)) && BufferUtil.contentsEqual(TaskInstanceStreamProcessor.this.taskIndex.getLockOwner(), TaskInstanceStreamProcessor.this.taskEvent.getLockOwner())) {
                    taskState = TaskState.COMPLETED;
                    this.isCompleted = true;
                }
            }
            TaskInstanceStreamProcessor.this.taskEvent.setState(taskState);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            return TaskInstanceStreamProcessor.this.writeResponse();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            if (this.isCompleted) {
                TaskInstanceStreamProcessor.this.taskIndex.remove(TaskInstanceStreamProcessor.this.eventKey);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$CreateTaskProcessor.class */
    private class CreateTaskProcessor implements EventProcessor {
        private CreateTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.CREATED);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            boolean z = true;
            if (TaskInstanceStreamProcessor.this.sourceEventMetadata.hasRequestMetadata()) {
                z = TaskInstanceStreamProcessor.this.writeResponse();
            }
            return z;
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            TaskInstanceStreamProcessor.this.taskIndex.newTaskInstance(TaskInstanceStreamProcessor.this.eventKey).setState((short) 1).write();
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$ExpireLockTaskProcessor.class */
    private class ExpireLockTaskProcessor implements EventProcessor {
        protected boolean isExpired;

        private ExpireLockTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            this.isExpired = false;
            TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey);
            if (TaskInstanceStreamProcessor.this.taskIndex.getState() == 2) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCK_EXPIRED);
                this.isExpired = true;
            }
            if (this.isExpired) {
                return;
            }
            TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCK_EXPIRATION_REJECTED);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            if (this.isExpired) {
                TaskInstanceStreamProcessor.this.taskIndex.setState((short) 4).write();
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$FailTaskProcessor.class */
    private class FailTaskProcessor implements EventProcessor {
        protected boolean isFailed;

        private FailTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            this.isFailed = false;
            TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey);
            if (TaskInstanceStreamProcessor.this.taskIndex.getState() == 2 && BufferUtil.contentsEqual(TaskInstanceStreamProcessor.this.taskIndex.getLockOwner(), TaskInstanceStreamProcessor.this.taskEvent.getLockOwner())) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.FAILED);
                this.isFailed = true;
            }
            if (this.isFailed) {
                return;
            }
            TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.FAIL_REJECTED);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            return TaskInstanceStreamProcessor.this.writeResponse();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            if (this.isFailed) {
                TaskInstanceStreamProcessor.this.taskIndex.setState((short) 3).write();
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$LockTaskProcessor.class */
    private class LockTaskProcessor implements EventProcessor {
        protected boolean isLocked;

        private LockTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            this.isLocked = false;
            short state = TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey).getState();
            if (state == 1 || state == 3 || state == 4) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCKED);
                this.isLocked = true;
            }
            if (this.isLocked) {
                return;
            }
            TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCK_REJECTED);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            boolean increaseSubscriptionCreditsAsync;
            if (this.isLocked) {
                increaseSubscriptionCreditsAsync = TaskInstanceStreamProcessor.this.subscribedEventWriter.partitionId(TaskInstanceStreamProcessor.this.logStreamPartitionId).position(TaskInstanceStreamProcessor.this.eventPosition).key(TaskInstanceStreamProcessor.this.eventKey).subscriberKey(TaskInstanceStreamProcessor.this.sourceEventMetadata.getSubscriberKey()).subscriptionType(SubscriptionType.TASK_SUBSCRIPTION).eventType(EventType.TASK_EVENT).eventWriter(TaskInstanceStreamProcessor.this.taskEvent).tryWriteMessage(TaskInstanceStreamProcessor.this.sourceEventMetadata.getRequestStreamId());
            } else {
                TaskInstanceStreamProcessor.this.creditsRequest.setSubscriberKey(TaskInstanceStreamProcessor.this.sourceEventMetadata.getSubscriberKey());
                TaskInstanceStreamProcessor.this.creditsRequest.setCredits(1);
                increaseSubscriptionCreditsAsync = TaskInstanceStreamProcessor.this.taskSubscriptionManager.increaseSubscriptionCreditsAsync(TaskInstanceStreamProcessor.this.creditsRequest);
            }
            return increaseSubscriptionCreditsAsync;
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void updateState() {
            if (this.isLocked) {
                TaskInstanceStreamProcessor.this.taskIndex.setState((short) 2).setLockOwner(TaskInstanceStreamProcessor.this.taskEvent.getLockOwner()).write();
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/task/processor/TaskInstanceStreamProcessor$UpdateRetriesTaskProcessor.class */
    private class UpdateRetriesTaskProcessor implements EventProcessor {
        private UpdateRetriesTaskProcessor() {
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public void processEvent() {
            if (TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey).getState() != 3 || TaskInstanceStreamProcessor.this.taskEvent.getRetries() <= 0) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.UPDATE_RETRIES_REJECTED);
            } else {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.RETRIES_UPDATED);
            }
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public boolean executeSideEffects() {
            return TaskInstanceStreamProcessor.this.writeResponse();
        }

        @Override // io.zeebe.logstreams.processor.EventProcessor
        public long writeEvent(LogStreamWriter logStreamWriter) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(logStreamWriter);
        }
    }

    public TaskInstanceStreamProcessor(CommandResponseWriter commandResponseWriter, SubscribedEventWriter subscribedEventWriter, TaskSubscriptionManager taskSubscriptionManager) {
        this.responseWriter = commandResponseWriter;
        this.subscribedEventWriter = subscribedEventWriter;
        this.taskSubscriptionManager = taskSubscriptionManager;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public int getPriority(long j) {
        return 100;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public SnapshotSupport getStateResource() {
        return this.taskIndex.getSnapshotSupport();
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public void onOpen(StreamProcessorContext streamProcessorContext) {
        LogStream sourceStream = streamProcessorContext.getSourceStream();
        this.logStreamTopicName = sourceStream.getTopicName();
        this.logStreamPartitionId = sourceStream.getPartitionId();
        this.targetStream = streamProcessorContext.getTargetStream();
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public void onClose() {
        this.taskIndex.close();
    }

    public static MetadataFilter eventFilter() {
        return brokerEventMetadata -> {
            return brokerEventMetadata.getEventType() == EventType.TASK_EVENT;
        };
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        this.taskIndex.reset();
        this.eventKey = loggedEvent.getKey();
        this.eventPosition = loggedEvent.getPosition();
        loggedEvent.readMetadata(this.sourceEventMetadata);
        this.taskEvent.reset();
        loggedEvent.readValue(this.taskEvent);
        EventProcessor eventProcessor = null;
        switch (this.taskEvent.getState()) {
            case CREATE:
                eventProcessor = this.createTaskProcessor;
                break;
            case LOCK:
                eventProcessor = this.lockTaskProcessor;
                break;
            case COMPLETE:
                eventProcessor = this.completeTaskProcessor;
                break;
            case FAIL:
                eventProcessor = this.failTaskProcessor;
                break;
            case EXPIRE_LOCK:
                eventProcessor = this.expireLockTaskProcessor;
                break;
            case UPDATE_RETRIES:
                eventProcessor = this.updateRetriesTaskProcessor;
                break;
            case CANCEL:
                eventProcessor = this.cancelTaskProcessor;
                break;
        }
        return eventProcessor;
    }

    @Override // io.zeebe.logstreams.processor.StreamProcessor
    public void afterEvent() {
        this.taskEvent.reset();
    }

    protected boolean writeResponse() {
        return this.responseWriter.partitionId(this.logStreamPartitionId).position(this.eventPosition).key(this.eventKey).eventWriter(this.taskEvent).tryWriteResponse(this.sourceEventMetadata.getRequestStreamId(), this.sourceEventMetadata.getRequestId());
    }

    protected long writeEventToLogStream(LogStreamWriter logStreamWriter) {
        this.targetEventMetadata.reset();
        this.targetEventMetadata.protocolVersion(1).eventType(EventType.TASK_EVENT);
        return logStreamWriter.key(this.eventKey).metadataWriter(this.targetEventMetadata).valueWriter(this.taskEvent).tryWrite();
    }
}
