package com.microsoft.azure.spring.integration.eventhub.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.eventhub.checkpoint.CheckpointManager;
import com.microsoft.azure.spring.integration.eventhub.converter.EventHubMessageConverter;
import java.util.HashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/impl/EventHubProcessor.class */
public class EventHubProcessor implements IEventProcessor {
    private static final Logger log = LoggerFactory.getLogger(EventHubProcessor.class);
    private final Consumer<Message<?>> consumer;
    private final Class<?> payloadType;
    private final CheckpointConfig checkpointConfig;
    private final EventHubMessageConverter messageConverter;
    private final CheckpointManager checkpointManager;

    public EventHubProcessor(Consumer<Message<?>> consumer, Class<?> cls, CheckpointConfig checkpointConfig, EventHubMessageConverter eventHubMessageConverter) {
        this.consumer = consumer;
        this.payloadType = cls;
        this.checkpointConfig = checkpointConfig;
        this.messageConverter = eventHubMessageConverter;
        this.checkpointManager = CheckpointManager.of(checkpointConfig);
    }

    public void onOpen(PartitionContext partitionContext) throws Exception {
        log.info("Partition {} is opening", partitionContext.getPartitionId());
    }

    public void onClose(PartitionContext partitionContext, CloseReason closeReason) throws Exception {
        log.info("Partition {} is closing for reason {}", partitionContext.getPartitionId(), closeReason);
    }

    public void onEvents(PartitionContext partitionContext, Iterable<EventData> iterable) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("azure_raw_partition_id", partitionContext.getPartitionId());
        for (EventData eventData : iterable) {
            AzureCheckpointer azureCheckpointer = new AzureCheckpointer(() -> {
                return partitionContext.checkpoint(eventData);
            });
            if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
                hashMap.put("azure_checkpointer", azureCheckpointer);
            }
            this.consumer.accept(this.messageConverter.toMessage(eventData, new MessageHeaders(hashMap), this.payloadType));
            this.checkpointManager.onMessage(partitionContext, eventData);
        }
        this.checkpointManager.completeBatch(partitionContext);
    }

    public void onError(PartitionContext partitionContext, Throwable th) {
        log.error("Partition {} onError", partitionContext.getPartitionId(), th);
    }
}
