package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.ExcludeAuthorizationCheck;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.MessageCorrelationState;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.MessageSubscription;
import io.camunda.zeebe.engine.state.message.RequestData;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageCorrelationRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.MessageCorrelationIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.InstantSource;

@ExcludeAuthorizationCheck
/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageSubscriptionCorrelateProcessor.class */
public final class MessageSubscriptionCorrelateProcessor implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private static final String NO_SUBSCRIPTION_FOUND_MESSAGE = "Expected to correlate subscription for element with key '%d' and message name '%s', but no such message subscription exists";
    private final MessageSubscriptionState subscriptionState;
    private final MessageCorrelationState messageCorrelationState;
    private final MessageCorrelator messageCorrelator;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final TypedResponseWriter responseWriter;

    public MessageSubscriptionCorrelateProcessor(int i, MessageState messageState, MessageCorrelationState messageCorrelationState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender, Writers writers, InstantSource instantSource) {
        this.subscriptionState = messageSubscriptionState;
        this.messageCorrelationState = messageCorrelationState;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.responseWriter = writers.response();
        this.messageCorrelator = new MessageCorrelator(i, messageState, subscriptionCommandSender, this.stateWriter, writers.sideEffect(), instantSource);
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        MessageSubscriptionRecord value = typedRecord.getValue();
        MessageSubscription messageSubscription = this.subscriptionState.get(value.getElementInstanceKey(), value.getMessageNameBuffer());
        if (messageSubscription == null) {
            rejectCommand(typedRecord);
            return;
        }
        MessageSubscriptionRecord record = messageSubscription.getRecord();
        this.stateWriter.appendFollowUpEvent(messageSubscription.getKey(), MessageSubscriptionIntent.CORRELATED, record);
        writeCorrelationResponse(typedRecord, record);
        if (record.isInterrupting()) {
            return;
        }
        this.messageCorrelator.correlateNextMessage(messageSubscription.getKey(), record);
    }

    private void writeCorrelationResponse(TypedRecord<MessageSubscriptionRecord> typedRecord, MessageSubscriptionRecord messageSubscriptionRecord) {
        long messageKey = messageSubscriptionRecord.getMessageKey();
        if (this.messageCorrelationState.existsRequestDataForMessageKey(messageKey)) {
            RequestData requestData = this.messageCorrelationState.getRequestData(messageKey);
            MessageCorrelationRecord processInstanceKey = new MessageCorrelationRecord().setName(messageSubscriptionRecord.getMessageName()).setCorrelationKey(messageSubscriptionRecord.getCorrelationKey()).setVariables(messageSubscriptionRecord.getVariablesBuffer()).setTenantId(messageSubscriptionRecord.getTenantId()).setMessageKey(messageKey).setProcessInstanceKey(messageSubscriptionRecord.getProcessInstanceKey());
            this.stateWriter.appendFollowUpEvent(messageKey, MessageCorrelationIntent.CORRELATED, processInstanceKey);
            this.responseWriter.writeResponse(typedRecord.getValue().getMessageKey(), MessageCorrelationIntent.CORRELATED, processInstanceKey, ValueType.MESSAGE_CORRELATION, requestData.getRequestId().longValue(), requestData.getRequestStreamId().intValue());
        }
    }

    private void rejectCommand(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        MessageSubscriptionRecord value = typedRecord.getValue();
        this.rejectionWriter.appendRejection(typedRecord, RejectionType.NOT_FOUND, String.format(NO_SUBSCRIPTION_FOUND_MESSAGE, Long.valueOf(value.getElementInstanceKey()), BufferUtil.bufferAsString(value.getMessageNameBuffer())));
    }
}
