package com.microsoft.azure.spring.integration.core.support;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.api.StartPosition;
import com.microsoft.azure.spring.integration.eventhub.EventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.EventHubOperation;
import com.microsoft.azure.spring.integration.eventhub.EventHubTemplate;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.messaging.Message;

/* loaded from: input_file:com/microsoft/azure/spring/integration/core/support/EventHubTestOperation.class */
public class EventHubTestOperation extends EventHubTemplate implements EventHubOperation {
    private final Multimap<String, EventData> eventHubsByName;
    private final Map<String, Map<String, EventHubTemplate.EventHubProcessor<?>>> processorsByNameAndGroup;
    private final Supplier<PartitionContext> partitionContextSupplier;

    public EventHubTestOperation(EventHubClientFactory eventHubClientFactory, Supplier<PartitionContext> supplier) {
        super(eventHubClientFactory);
        this.eventHubsByName = ArrayListMultimap.create();
        this.processorsByNameAndGroup = new ConcurrentHashMap();
        this.partitionContextSupplier = supplier;
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubTemplate, com.microsoft.azure.spring.integration.core.api.SendOperation
    public <U> CompletableFuture<Void> sendAsync(String str, Message<U> message, PartitionSupplier partitionSupplier) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        EventData fromMessage = getMessageConverter().fromMessage(message, EventData.class);
        this.eventHubsByName.put(str, fromMessage);
        this.processorsByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(str).values().forEach(eventHubProcessor -> {
            try {
                eventHubProcessor.onEvents(this.partitionContextSupplier.get(), Collections.singleton(fromMessage));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        completableFuture.complete(null);
        return completableFuture;
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubTemplate
    protected synchronized EventProcessorHost register(Tuple<String, String> tuple, Consumer<Message<?>> consumer, Class<?> cls) {
        String str = (String) tuple.getFirst();
        String str2 = (String) tuple.getSecond();
        this.processorsByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(str).computeIfAbsent(str2, str3 -> {
            return new EventHubTemplate.EventHubProcessor(consumer, cls);
        });
        if (getStartPosition() != StartPosition.EARLISET) {
            return null;
        }
        this.processorsByNameAndGroup.get(str).values().forEach(eventHubProcessor -> {
            try {
                eventHubProcessor.onEvents(this.partitionContextSupplier.get(), this.eventHubsByName.get(str));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        return null;
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubTemplate, com.microsoft.azure.spring.integration.core.api.SubscribeByGroupOperation
    public boolean unsubscribe(String str, String str2) {
        this.processorsByNameAndGroup.get(str).remove(str2);
        return true;
    }
}
