package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.ExcludeAuthorizationCheck;
import io.camunda.zeebe.engine.processing.distribution.CommandDistributionBehavior;
import io.camunda.zeebe.engine.processing.streamprocessor.DistributedTypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.MessageSubscription;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;

@ExcludeAuthorizationCheck
/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageSubscriptionMigrateProcessor.class */
public class MessageSubscriptionMigrateProcessor implements DistributedTypedRecordProcessor<MessageSubscriptionRecord> {
    private final MessageSubscriptionState subscriptionState;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final CommandDistributionBehavior commandDistributionBehavior;
    private final MessageSubscriptionRecord messageSubscriptionRecord = new MessageSubscriptionRecord();

    public MessageSubscriptionMigrateProcessor(MessageSubscriptionState messageSubscriptionState, Writers writers, CommandDistributionBehavior commandDistributionBehavior) {
        this.subscriptionState = messageSubscriptionState;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.commandDistributionBehavior = commandDistributionBehavior;
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.DistributedTypedRecordProcessor
    public void processNewCommand(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        migrateMessageSubscription(typedRecord);
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.DistributedTypedRecordProcessor
    public void processDistributedCommand(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        migrateMessageSubscription(typedRecord);
        this.commandDistributionBehavior.acknowledgeCommand(typedRecord);
    }

    private void migrateMessageSubscription(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        MessageSubscriptionRecord value = typedRecord.getValue();
        MessageSubscription messageSubscription = this.subscriptionState.get(value.getElementInstanceKey(), value.getMessageNameBuffer());
        if (messageSubscription == null) {
            this.rejectionWriter.appendRejection(typedRecord, RejectionType.NOT_FOUND, "Expected to migrate a message subscription with key '%s', but subscription not found for element instance key '%d' and the provided message name".formatted(Long.valueOf(typedRecord.getKey()), Long.valueOf(value.getElementInstanceKey())));
            return;
        }
        this.messageSubscriptionRecord.reset();
        this.messageSubscriptionRecord.copyFrom(messageSubscription.getRecord());
        this.stateWriter.appendFollowUpEvent(messageSubscription.getKey(), MessageSubscriptionIntent.MIGRATED, this.messageSubscriptionRecord.setBpmnProcessId(value.getBpmnProcessIdBuffer()).setInterrupting(value.isInterrupting()));
    }
}
