package org.distributeme.support.eventservice;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.anotheria.anoprise.dataspace.persistence.DataspacePersistenceConfiguration;
import net.anotheria.anoprise.eventservice.EventChannel;
import net.anotheria.anoprise.eventservice.EventService;
import net.anotheria.anoprise.eventservice.EventServiceFactory;
import net.anotheria.anoprise.eventservice.EventServiceListener;
import net.anotheria.anoprise.eventservice.EventTransportShell;
import net.anotheria.anoprise.eventservice.ProxyType;
import net.anotheria.anoprise.eventservice.RemoteEventChannelConsumerProxy;
import net.anotheria.anoprise.eventservice.RemoteEventChannelSupplierProxy;
import net.anotheria.anoprise.eventservice.RemoteEventChannelSupportFactory;
import net.anotheria.util.IdCodeGenerator;
import org.distributeme.core.RMIRegistryUtil;
import org.distributeme.core.RegistryUtil;
import org.distributeme.core.ServiceDescriptor;
import org.distributeme.core.util.EventServiceRegistryUtil;
import org.distributeme.support.eventservice.generated.EventServiceRMIBridgeServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/distributeme-support-2.1.0.jar:org/distributeme/support/eventservice/DiMeRemoteEventChannelRMISupport.class */
public class DiMeRemoteEventChannelRMISupport implements RemoteEventChannelSupportFactory, EventServiceListener {
    private static EventService es;
    private ExecutorService executorService = Executors.newFixedThreadPool(5);
    private static ServiceDescriptor descriptor = null;
    private static final String INSTANCE_ID = IdCodeGenerator.generateCode(10);
    private static final ConcurrentHashMap<ServiceDescriptor, EventServiceRMIBridgeService> bridges = new ConcurrentHashMap<>();
    private static Logger LOG = LoggerFactory.getLogger(DiMeRemoteEventChannelRMISupport.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiMeRemoteEventChannelRMISupport() {
        descriptor = RegistryUtil.createLocalServiceDescription(ServiceDescriptor.Protocol.RMI, "org_distributeme_support_eventservice_EventServiceRMIBridgeService", INSTANCE_ID, RMIRegistryUtil.getRmiRegistryPort());
        es = EventServiceFactory.createEventService();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ServiceDescriptor getHomeReference() {
        return descriptor;
    }

    @Override // net.anotheria.anoprise.eventservice.RemoteEventChannelSupportFactory
    public RemoteEventChannelConsumerProxy createRemoteEventChannelConsumerProxy(String str) {
        return new DiMeRemoteEventChannelConsumerProxy(str);
    }

    @Override // net.anotheria.anoprise.eventservice.RemoteEventChannelSupportFactory
    public RemoteEventChannelSupplierProxy createRemoteEventChannelSupplierProxy(String str) {
        return new DiMeRemoteEventChannelSupplierProxy(str);
    }

    @Override // net.anotheria.anoprise.eventservice.EventServiceListener
    public void channelCreated(String str, ProxyType proxyType) {
        switch (proxyType) {
            case PUSH_CONSUMER_PROXY:
                localConsumerProxyCreated(str);
                return;
            case PUSH_SUPPLIER_PROXY:
                localSupplierProxyCreated(str);
                return;
            default:
                throw new AssertionError("Unknown proxy type " + proxyType);
        }
    }

    private EventServiceRMIBridgeService getBridge(ServiceDescriptor serviceDescriptor) {
        EventServiceRMIBridgeService eventServiceRMIBridgeService = bridges.get(serviceDescriptor);
        if (eventServiceRMIBridgeService != null) {
            return eventServiceRMIBridgeService;
        }
        try {
            EventServiceRMIBridgeService eventServiceRMIBridgeService2 = (EventServiceRMIBridgeService) Class.forName("org.distributeme.support.eventservice.generated.RemoteEventServiceRMIBridgeServiceStub").getConstructor(ServiceDescriptor.class).newInstance(serviceDescriptor);
            EventServiceRMIBridgeService putIfAbsent = bridges.putIfAbsent(serviceDescriptor, eventServiceRMIBridgeService2);
            return putIfAbsent == null ? eventServiceRMIBridgeService2 : putIfAbsent;
        } catch (ClassNotFoundException e) {
            throw new AssertionError("Misconfigured? can't find org.distributeme.support.eventservice.generated.RemoteEventServiceRMIBridgeServiceStub");
        } catch (NoSuchMethodException e2) {
            throw new AssertionError("Misconfigured? can't find org.distributeme.support.eventservice.generated.RemoteEventServiceRMIBridgeServiceStub constructor");
        } catch (Exception e3) {
            LOG.error("getBridge(" + serviceDescriptor + ")", (Throwable) e3);
            return null;
        }
    }

    private void localConsumerProxyCreated(String str) {
        ServiceDescriptor homeReference = getHomeReference();
        for (ServiceDescriptor serviceDescriptor : EventServiceRegistryUtil.registerConsumerAtRegistryAndGetSuppliers(str, descriptor)) {
            if (homeReference.equals(serviceDescriptor)) {
                LOG.debug("Skipped registering at myself");
            } else {
                registerAsConsumerAtRemoteSupplier(str, serviceDescriptor);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerAsConsumerAtRemoteSupplier(String str, ServiceDescriptor serviceDescriptor) {
        EventServiceRMIBridgeService bridge = getBridge(serviceDescriptor);
        if (bridge == null) {
            notifyBrokenSupplier(serviceDescriptor);
            return;
        }
        try {
            String instanceId = bridge.getInstanceId();
            if (instanceId.equals(serviceDescriptor.getInstanceId())) {
                LOG.debug("Registering @ " + serviceDescriptor);
                bridge.registerRemoteConsumer(str, descriptor);
            } else {
                LOG.info("Instanceid mismatch, expected " + serviceDescriptor.getInstanceId() + ", received " + instanceId + " throwing away");
                notifyBrokenSupplier(serviceDescriptor);
            }
        } catch (RuntimeException e) {
            LOG.error("can't connect to : registerAsConsumerAtRemoteSupplier(" + str + DataspacePersistenceConfiguration.SEPARATOR + serviceDescriptor + ")", (Throwable) e);
            notifyBrokenSupplier(serviceDescriptor);
        } catch (EventServiceRMIBridgeServiceException e2) {
            LOG.error("can't connect to : registerAsConsumerAtRemoteSupplier(" + str + DataspacePersistenceConfiguration.SEPARATOR + serviceDescriptor + ")", (Throwable) e2);
            notifyBrokenSupplier(serviceDescriptor);
        }
    }

    private void notifyBrokenSupplier(ServiceDescriptor serviceDescriptor) {
        try {
            EventServiceRegistryUtil.notifySupplierNotAvailable(serviceDescriptor);
        } catch (Exception e) {
            LOG.warn("notifyBrokenSupplier(" + serviceDescriptor + ") failed", (Throwable) e);
        }
        bridges.remove(serviceDescriptor);
    }

    private void notifyBrokenConsumer(ServiceDescriptor serviceDescriptor) {
        try {
            EventServiceRegistryUtil.notifyConsumerNotAvailable(serviceDescriptor);
        } catch (Exception e) {
            LOG.warn("notifyBrokenConsumer(" + serviceDescriptor + ") failed ", (Throwable) e);
        }
        bridges.remove(serviceDescriptor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyBrokenConsumer(RemoteConsumerWrapper remoteConsumerWrapper) {
        LOG.debug("NOTIFY brokenConsumer: " + remoteConsumerWrapper);
        ((DiMeRemoteEventChannelConsumerProxy) es.obtainEventChannel(remoteConsumerWrapper.getChannelName(), ProxyType.REMOTE_CONSUMER_PROXY)).removeRemoteConsumer(remoteConsumerWrapper);
        notifyBrokenConsumer(remoteConsumerWrapper.getHomeReference());
    }

    private void localSupplierProxyCreated(String str) {
        ServiceDescriptor homeReference = getHomeReference();
        List<ServiceDescriptor> registerSupplierAtRegistryAndGetConsumers = EventServiceRegistryUtil.registerSupplierAtRegistryAndGetConsumers(str, homeReference);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Consumers: " + registerSupplierAtRegistryAndGetConsumers);
        }
        for (ServiceDescriptor serviceDescriptor : registerSupplierAtRegistryAndGetConsumers) {
            EventServiceRMIBridgeService bridge = getBridge(serviceDescriptor);
            if (bridge == null) {
                notifyBrokenConsumer(serviceDescriptor);
            } else {
                try {
                    String instanceId = bridge.getInstanceId();
                    if (instanceId.equals(serviceDescriptor.getInstanceId())) {
                        LOG.debug("Registering @ " + serviceDescriptor);
                        bridge.registerRemoteSupplier(str, homeReference);
                    } else {
                        LOG.debug("Instanceid mismatch, expected " + serviceDescriptor.getInstanceId() + ", received " + instanceId + " throwing away");
                        notifyBrokenConsumer(serviceDescriptor);
                    }
                } catch (EventServiceRMIBridgeServiceException e) {
                    LOG.error("localSupplierProxyCreated", (Throwable) e);
                }
            }
        }
    }

    @Override // net.anotheria.anoprise.eventservice.EventServiceListener
    public void channelDestroyed(String str, ProxyType proxyType) {
        LOG.info("Channel " + str + " destroyed");
    }

    public static final void initEventService() {
        try {
            EventServiceRMIBridgeServer.init();
            EventServiceRMIBridgeServer.createServiceAndRegisterLocally();
            descriptor = RegistryUtil.createLocalServiceDescription(ServiceDescriptor.Protocol.RMI, "org_distributeme_support_eventservice_EventServiceRMIBridgeService", INSTANCE_ID, RMIRegistryUtil.getRmiRegistryPort());
        } catch (Exception e) {
            LOG.error("Can't init eventservice - probably running without events", (Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerRemoteConsumer(String str, ServiceDescriptor serviceDescriptor) {
        EventChannel obtainEventChannel = es.obtainEventChannel(str, ProxyType.REMOTE_CONSUMER_PROXY);
        LOG.debug("REGISTER REMOTE CONSUMER @ channel " + obtainEventChannel + ", consumer: " + serviceDescriptor);
        ((DiMeRemoteEventChannelConsumerProxy) obtainEventChannel).addRemoteConsumer(new RemoteConsumerWrapper(this, str, serviceDescriptor, getBridge(serviceDescriptor)));
        LOG.debug("REGISTER REMOTE CONSUMER @ channel " + obtainEventChannel + ", consumer: " + serviceDescriptor + " DONE!");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerRemoteSupplier(final String str, final ServiceDescriptor serviceDescriptor) {
        LOG.debug("Register remote supplier " + serviceDescriptor + " for channel " + str + " called.");
        this.executorService.execute(new Runnable() { // from class: org.distributeme.support.eventservice.DiMeRemoteEventChannelRMISupport.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    DiMeRemoteEventChannelRMISupport.this.registerAsConsumerAtRemoteSupplier(str, serviceDescriptor);
                } catch (Exception e) {
                    DiMeRemoteEventChannelRMISupport.LOG.error("Can't register as consumer at remote supplier, channel: " + str + ", myRef: " + serviceDescriptor);
                }
            }
        });
    }

    public void deliverEvent(EventTransportShell eventTransportShell) {
        ((DiMeRemoteEventChannelSupplierProxy) es.obtainEventChannel(eventTransportShell.getChannelName(), ProxyType.REMOTE_SUPPLIER_PROXY)).deliverEvent(eventTransportShell.getData());
    }
}
