package com.microsoft.azure.spring.integration.eventhub.support;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.api.StartPosition;
import com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubProcessor;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubTemplate;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.springframework.messaging.Message;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/support/EventHubTestOperation.class */
public class EventHubTestOperation extends EventHubTemplate {
    private final Multimap<String, EventData> eventHubsByName;
    private final Map<String, Map<String, EventHubProcessor>> processorsByNameAndGroup;
    private final Supplier<PartitionContext> partitionContextSupplier;

    public EventHubTestOperation(EventHubClientFactory eventHubClientFactory, Supplier<PartitionContext> supplier) {
        super(eventHubClientFactory);
        this.eventHubsByName = ArrayListMultimap.create();
        this.processorsByNameAndGroup = new ConcurrentHashMap();
        this.partitionContextSupplier = supplier;
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate
    public <U> CompletableFuture<Void> sendAsync(String str, Message<U> message, PartitionSupplier partitionSupplier) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        EventData eventData = (EventData) getMessageConverter().fromMessage(message, EventData.class);
        this.eventHubsByName.put(str, eventData);
        this.processorsByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(str).values().forEach(eventHubProcessor -> {
            try {
                eventHubProcessor.onEvents(this.partitionContextSupplier.get(), Collections.singleton(eventData));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        completableFuture.complete(null);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.microsoft.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate
    public synchronized void register(String str, String str2, EventHubProcessor eventHubProcessor) {
        this.processorsByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(str).putIfAbsent(str2, eventHubProcessor);
        if (getStartPosition() == StartPosition.EARLIEST) {
            this.processorsByNameAndGroup.get(str).values().forEach(eventHubProcessor2 -> {
                try {
                    eventHubProcessor2.onEvents(this.partitionContextSupplier.get(), this.eventHubsByName.get(str));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.impl.EventHubTemplate
    public boolean unsubscribe(String str, String str2) {
        this.processorsByNameAndGroup.get(str).remove(str2);
        return true;
    }
}
