package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.ExcludeAuthorizationCheck;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.RequestData;
import io.camunda.zeebe.engine.state.message.StoredMessage;
import io.camunda.zeebe.engine.state.mutable.MutableMessageCorrelationState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageCorrelationRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.MessageCorrelationIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import java.util.concurrent.atomic.AtomicBoolean;

@ExcludeAuthorizationCheck
/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageSubscriptionRejectProcessor.class */
public final class MessageSubscriptionRejectProcessor implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private static final String SUBSCRIPTION_NOT_FOUND = "Expected to find subscription for message with name '%s' and correlation key '%s', but none was found.";
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final MutableMessageCorrelationState messageCorrelationState;
    private final SubscriptionCommandSender commandSender;
    private final StateWriter stateWriter;
    private final TypedResponseWriter responseWriter;

    public MessageSubscriptionRejectProcessor(MessageState messageState, MessageSubscriptionState messageSubscriptionState, MutableMessageCorrelationState mutableMessageCorrelationState, SubscriptionCommandSender subscriptionCommandSender, Writers writers) {
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
        this.messageCorrelationState = mutableMessageCorrelationState;
        this.commandSender = subscriptionCommandSender;
        this.stateWriter = writers.state();
        this.responseWriter = writers.response();
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        MessageSubscriptionRecord value = typedRecord.getValue();
        this.stateWriter.appendFollowUpEvent(typedRecord.getKey(), MessageSubscriptionIntent.REJECTED, value);
        if (findSubscriptionToCorrelate(value)) {
            return;
        }
        writeNotCorrelatedResponse(typedRecord);
    }

    private boolean findSubscriptionToCorrelate(MessageSubscriptionRecord messageSubscriptionRecord) {
        long messageKey = messageSubscriptionRecord.getMessageKey();
        StoredMessage message = this.messageState.getMessage(messageKey);
        if (message == null) {
            return false;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.subscriptionState.visitSubscriptions(messageSubscriptionRecord.getTenantId(), messageSubscriptionRecord.getMessageNameBuffer(), messageSubscriptionRecord.getCorrelationKeyBuffer(), messageSubscription -> {
            MessageSubscriptionRecord record = messageSubscription.getRecord();
            boolean z = record.getBpmnProcessIdBuffer().equals(messageSubscriptionRecord.getBpmnProcessIdBuffer()) && !messageSubscription.isCorrelating();
            if (z) {
                record.setMessageKey(messageKey).setVariables(message.getMessage().getVariablesBuffer());
                this.stateWriter.appendFollowUpEvent(messageSubscription.getKey(), MessageSubscriptionIntent.CORRELATING, record);
                sendCorrelateCommand(record);
                atomicBoolean.set(true);
            }
            return !z;
        });
        return atomicBoolean.get();
    }

    private void sendCorrelateCommand(MessageSubscriptionRecord messageSubscriptionRecord) {
        this.commandSender.correlateProcessMessageSubscription(messageSubscriptionRecord.getProcessInstanceKey(), messageSubscriptionRecord.getElementInstanceKey(), messageSubscriptionRecord.getBpmnProcessIdBuffer(), messageSubscriptionRecord.getMessageNameBuffer(), messageSubscriptionRecord.getMessageKey(), messageSubscriptionRecord.getVariablesBuffer(), messageSubscriptionRecord.getCorrelationKeyBuffer(), messageSubscriptionRecord.getTenantId());
    }

    private void writeNotCorrelatedResponse(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        MessageSubscriptionRecord value = typedRecord.getValue();
        long messageKey = value.getMessageKey();
        if (this.messageCorrelationState.existsRequestDataForMessageKey(messageKey)) {
            RequestData requestData = this.messageCorrelationState.getRequestData(messageKey);
            this.stateWriter.appendFollowUpEvent(messageKey, MessageCorrelationIntent.NOT_CORRELATED, new MessageCorrelationRecord().setName(value.getMessageName()).setCorrelationKey(value.getCorrelationKey()).setVariables(value.getVariablesBuffer()).setTenantId(value.getTenantId()).setMessageKey(messageKey));
            this.responseWriter.writeRejection(typedRecord, RejectionType.NOT_FOUND, SUBSCRIPTION_NOT_FOUND.formatted(Long.valueOf(value.getMessageKey()), value.getCorrelationKey()), requestData.getRequestId().longValue(), requestData.getRequestStreamId().intValue());
        }
    }
}
