package com.microsoft.azure.toolkit.lib.eventhubs;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.resourcemanager.eventhubs.EventHubsManager;
import com.azure.resourcemanager.eventhubs.fluent.models.EventhubInner;
import com.azure.resourcemanager.eventhubs.models.AccessRights;
import com.azure.resourcemanager.eventhubs.models.EntityStatus;
import com.azure.resourcemanager.eventhubs.models.EventHub;
import com.azure.resourcemanager.eventhubs.models.EventHubAuthorizationRule;
import com.microsoft.azure.toolkit.lib.Azure;
import com.microsoft.azure.toolkit.lib.common.bundle.AzureString;
import com.microsoft.azure.toolkit.lib.common.exception.AzureToolkitRuntimeException;
import com.microsoft.azure.toolkit.lib.common.messager.AzureMessager;
import com.microsoft.azure.toolkit.lib.common.messager.IAzureMessager;
import com.microsoft.azure.toolkit.lib.common.model.AbstractAzResource;
import com.microsoft.azure.toolkit.lib.common.model.AbstractAzResourceModule;
import com.microsoft.azure.toolkit.lib.common.model.Deletable;
import com.microsoft.azure.toolkit.lib.common.utils.Utils;
import com.microsoft.azure.toolkit.lib.resource.message.ISenderReceiver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Disposable;

/* loaded from: input_file:com/microsoft/azure/toolkit/lib/eventhubs/EventHubsInstance.class */
public class EventHubsInstance extends AbstractAzResource<EventHubsInstance, EventHubsNamespace, EventHub> implements Deletable, ISenderReceiver {

    @Nullable
    private EntityStatus entityStatus;

    @Nullable
    private EventHubConsumerAsyncClient consumerAsyncClient;
    private final List<Disposable> receivers;

    @Nullable
    private IAzureMessager messager;

    /* JADX INFO: Access modifiers changed from: protected */
    public EventHubsInstance(@Nonnull String str, @Nonnull EventHubsInstanceModule eventHubsInstanceModule) {
        super(str, eventHubsInstanceModule);
        this.receivers = new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventHubsInstance(@Nonnull EventHub eventHub, @Nonnull EventHubsInstanceModule eventHubsInstanceModule) {
        super(eventHub.name(), eventHubsInstanceModule);
        this.receivers = new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateAdditionalProperties(@Nullable EventHub eventHub, @Nullable EventHub eventHub2) {
        super.updateAdditionalProperties(eventHub, eventHub2);
        this.entityStatus = (EntityStatus) Optional.ofNullable(eventHub).map((v0) -> {
            return v0.innerModel();
        }).map((v0) -> {
            return v0.status();
        }).orElse(null);
    }

    @Nonnull
    public List<AbstractAzResourceModule<?, ?, ?>> getSubModules() {
        return Collections.emptyList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nonnull
    public String loadStatus(@Nonnull EventHub eventHub) {
        return ((EventhubInner) eventHub.innerModel()).status().toString();
    }

    public void updateStatus(EntityStatus entityStatus) {
        EventhubInner eventhubInner = (EventhubInner) remoteOptional().map((v0) -> {
            return v0.innerModel();
        }).orElse(new EventhubInner());
        EventHubsNamespace parent = getParent();
        Optional.ofNullable(parent.getParent().getRemote()).map((v0) -> {
            return v0.serviceClient();
        }).map((v0) -> {
            return v0.getEventHubs();
        }).ifPresent(eventHubsClient -> {
            doModify(() -> {
                eventHubsClient.createOrUpdate(getResourceGroupName(), parent.getName(), getName(), eventhubInner.withStatus(entityStatus));
            }, "Updating");
        });
    }

    public synchronized void startReceivingMessage() {
        String eventHubsConsumerGroup = Azure.az().config().getEventHubsConsumerGroup();
        this.messager = AzureMessager.getMessager();
        this.messager.info(AzureString.format("Start listening to event hub ({0}) for consumerGroup ({1})...\n", new Object[]{getName(), eventHubsConsumerGroup}));
        this.messager.info("You can change default consumer group in Azure Settings\n");
        remoteOptional().ifPresent(eventHub -> {
            eventHub.partitionIds().forEach(str -> {
                this.consumerAsyncClient = new EventHubClientBuilder().connectionString(getOrCreateConnectionString(Collections.singletonList(AccessRights.LISTEN))).consumerGroup(eventHubsConsumerGroup).buildAsyncConsumerClient();
                this.messager.info(AzureString.format("Created receiver for partition ({0})\n", new Object[]{str}));
                Optional.ofNullable(this.consumerAsyncClient).ifPresent(eventHubConsumerAsyncClient -> {
                    this.receivers.add(eventHubConsumerAsyncClient.receiveFromPartition(str, EventPosition.latest()).subscribe(partitionEvent -> {
                        this.messager.info(AzureString.format("Message Received from partition (%s): ", new Object[]{str}));
                        this.messager.debug(AzureString.format("\"%s\"\n", new Object[]{partitionEvent.getData().getBodyAsString()}));
                    }));
                });
            });
        });
    }

    public synchronized void stopReceivingMessage() {
        Optional.ofNullable(this.consumerAsyncClient).ifPresent((v0) -> {
            v0.close();
        });
        ((IAzureMessager) Optional.ofNullable(this.messager).orElse(AzureMessager.getMessager())).info(AzureString.format("Stop listening to event hub ({0})\n", new Object[]{getName()}));
        this.consumerAsyncClient = null;
        this.receivers.forEach((v0) -> {
            v0.dispose();
        });
        this.receivers.clear();
    }

    public boolean isListening() {
        return Objects.nonNull(this.consumerAsyncClient);
    }

    public boolean isSendEnabled() {
        return (getEntityStatus() == EntityStatus.SEND_DISABLED || getEntityStatus() == EntityStatus.DISABLED) ? false : true;
    }

    public void sendMessage(String str) {
        IAzureMessager messager = AzureMessager.getMessager();
        messager.info(AzureString.format("Sending message to Event Hub (%s)...\n", new Object[]{getName()}));
        try {
            EventHubProducerClient buildProducerClient = new EventHubClientBuilder().connectionString(getOrCreateConnectionString(Collections.singletonList(AccessRights.SEND))).buildProducerClient();
            Throwable th = null;
            try {
                try {
                    EventDataBatch createBatch = buildProducerClient.createBatch();
                    EventData eventData = new EventData(str);
                    if (!createBatch.tryAdd(eventData)) {
                        buildProducerClient.send(createBatch);
                        createBatch = buildProducerClient.createBatch();
                        if (!createBatch.tryAdd(eventData)) {
                            messager.error(AzureString.format("Failed to send message to Event Hub (%s): %s", new Object[]{getName(), "Event is too large for an empty batch. Max size: " + createBatch.getMaxSizeInBytes()}));
                        }
                    }
                    if (createBatch.getCount() > 0) {
                        buildProducerClient.send(createBatch);
                        messager.info("Successfully sent message ");
                        messager.success(AzureString.format("\"%s\"", new Object[]{str}));
                        messager.info(AzureString.format(" to Event Hub (%s)\n", new Object[]{getName()}));
                    }
                    if (buildProducerClient != null) {
                        if (0 != 0) {
                            try {
                                buildProducerClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            buildProducerClient.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            messager.error(AzureString.format("Failed to send message to Event Hub (%s): %s", new Object[]{getName(), e}));
        }
    }

    public String getOrCreateListenConnectionString() {
        return getOrCreateConnectionString(Collections.singletonList(AccessRights.LISTEN));
    }

    private String getOrCreateConnectionString(List<AccessRights> list) {
        List list2 = (List) Optional.ofNullable(getRemote()).map(eventHub -> {
            return (List) eventHub.listAuthorizationRules().stream().filter(eventHubAuthorizationRule -> {
                return new HashSet(eventHubAuthorizationRule.rights()).containsAll(list);
            }).collect(Collectors.toList());
        }).orElse(new ArrayList());
        if (!list2.isEmpty()) {
            return ((EventHubAuthorizationRule) list2.get(0)).getKeys().primaryConnectionString();
        }
        EventHubsManager eventHubsManager = (EventHubsManager) getParent().getParent().getRemote();
        if (Objects.isNull(eventHubsManager)) {
            throw new AzureToolkitRuntimeException(AzureString.format("resource ({0}) not found", new Object[]{getName()}).toString());
        }
        EventHubAuthorizationRule.DefinitionStages.WithAccessPolicy withExistingEventHub = ((EventHubAuthorizationRule.DefinitionStages.Blank) eventHubsManager.eventHubAuthorizationRules().define(String.format("policy-%s-%s", StringUtils.join(list, "-"), Utils.getTimestamp()))).withExistingEventHub(getResourceGroupName(), getParent().getName(), getName());
        EventHubAuthorizationRule.DefinitionStages.WithCreate withCreate = (EventHubAuthorizationRule.DefinitionStages.WithCreate) withExistingEventHub.withListenAccess();
        if (list.contains(AccessRights.MANAGE)) {
            withCreate = (EventHubAuthorizationRule.DefinitionStages.WithCreate) withExistingEventHub.withManageAccess();
        } else if (list.contains(AccessRights.SEND) && list.contains(AccessRights.LISTEN)) {
            withCreate = (EventHubAuthorizationRule.DefinitionStages.WithCreate) withExistingEventHub.withSendAndListenAccess();
        } else if (list.contains(AccessRights.SEND)) {
            withCreate = (EventHubAuthorizationRule.DefinitionStages.WithCreate) withExistingEventHub.withSendAccess();
        }
        return ((EventHubAuthorizationRule) withCreate.create()).getKeys().primaryConnectionString();
    }

    @Nullable
    public EntityStatus getEntityStatus() {
        return this.entityStatus;
    }
}
