package com.microsoft.azure.spring.integration.eventhub.checkpoint;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.EventContext;
import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/checkpoint/PartitionCountCheckpointManager.class */
public class PartitionCountCheckpointManager extends CheckpointManager {
    private static final Logger log = LoggerFactory.getLogger(PartitionCountCheckpointManager.class);
    private final ConcurrentHashMap<String, AtomicInteger> countByPartition;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionCountCheckpointManager(CheckpointConfig checkpointConfig) {
        super(checkpointConfig);
        this.countByPartition = new ConcurrentHashMap<>();
        Assert.isTrue(this.checkpointConfig.getCheckpointMode() == CheckpointMode.PARTITION_COUNT, () -> {
            return "PartitionCountCheckpointManager should have checkpointMode partition_count";
        });
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.checkpoint.CheckpointManager
    public void onMessage(EventContext eventContext, EventData eventData) {
        String partitionId = eventContext.getPartitionContext().getPartitionId();
        this.countByPartition.computeIfAbsent(partitionId, str -> {
            return new AtomicInteger(0);
        });
        AtomicInteger atomicInteger = this.countByPartition.get(partitionId);
        if (atomicInteger.incrementAndGet() >= this.checkpointConfig.getCheckpointCount()) {
            eventContext.updateCheckpointAsync().doOnError(th -> {
                logCheckpointFail(eventContext, eventData, th);
            }).doOnSuccess(r8 -> {
                logCheckpointSuccess(eventContext, eventData);
                atomicInteger.set(0);
            }).subscribe();
        }
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.checkpoint.CheckpointManager
    protected Logger getLogger() {
        return log;
    }
}
