package com.microsoft.azure.spring.integration.servicebus;

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/ServiceBusMessageHandler.class */
public abstract class ServiceBusMessageHandler<U> implements IMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ServiceBusMessageHandler.class);
    private final Consumer<Message<U>> consumer;
    private final Class<U> payloadType;
    private final CheckpointConfig checkpointConfig;
    private final ServiceBusMessageConverter messageConverter;

    public ServiceBusMessageHandler(Consumer<Message<U>> consumer, Class<U> cls, CheckpointConfig checkpointConfig, ServiceBusMessageConverter serviceBusMessageConverter) {
        this.consumer = consumer;
        this.payloadType = cls;
        this.checkpointConfig = checkpointConfig;
        this.messageConverter = serviceBusMessageConverter;
    }

    public CompletableFuture<Void> onMessageAsync(IMessage iMessage) {
        HashMap hashMap = new HashMap();
        AzureCheckpointer azureCheckpointer = new AzureCheckpointer(() -> {
            return success(iMessage.getLockToken());
        }, () -> {
            return failure(iMessage.getLockToken());
        });
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
            hashMap.put("azure_checkpointer", azureCheckpointer);
        }
        Message<U> message = this.messageConverter.toMessage(iMessage, new MessageHeaders(hashMap), this.payloadType);
        this.consumer.accept(message);
        return this.checkpointConfig.getCheckpointMode() == CheckpointMode.RECORD ? azureCheckpointer.success().whenComplete((r6, th) -> {
            checkpointHandler(message, th);
        }) : CompletableFuture.completedFuture(null);
    }

    public void notifyException(Throwable th, ExceptionPhase exceptionPhase) {
        log.error(String.format("Exception encountered in phase %s", exceptionPhase), th);
    }

    protected abstract CompletableFuture<Void> success(UUID uuid);

    protected abstract CompletableFuture<Void> failure(UUID uuid);

    protected abstract String buildCheckpointFailMessage(Message<?> message);

    protected abstract String buildCheckpointSuccessMessage(Message<?> message);

    private void checkpointHandler(Message<?> message, Throwable th) {
        if (th != null) {
            if (log.isWarnEnabled()) {
                log.warn(buildCheckpointFailMessage(message), th);
            }
        } else if (log.isDebugEnabled()) {
            log.debug(buildCheckpointSuccessMessage(message));
        }
    }
}
