package org.apache.aries.rsa.topologymanager.importer;

import java.util.Dictionary;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.osgi.framework.BundleContext;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.service.remoteserviceadmin.ImportReference;
import org.osgi.service.remoteserviceadmin.ImportRegistration;
import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent;
import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.class */
public class TopologyManagerImport implements EndpointEventListener, RemoteServiceAdminListener {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class);
    private final BundleContext bctx;
    private volatile boolean stopped;
    private final MultiMap<String, EndpointDescription> importPossibilities = new MultiMap<>();
    private final MultiMap<String, ImportRegistration> importedServices = new MultiMap<>();
    private final Set<RemoteServiceAdmin> rsaSet = new CopyOnWriteArraySet();
    private final ExecutorService execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(getClass()));

    public TopologyManagerImport(BundleContext bundleContext) {
        this.bctx = bundleContext;
    }

    public void start() {
        this.stopped = false;
        this.bctx.registerService(RemoteServiceAdminListener.class, this, (Dictionary) null);
    }

    public void stop() {
        this.stopped = true;
        this.execService.shutdown();
        try {
            this.execService.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.info("Interrupted while waiting for {} to terminate", this.execService);
            Thread.currentThread().interrupt();
        }
        this.importPossibilities.clear();
        this.importedServices.allValues().forEach(this::unimportRegistration);
    }

    public void add(RemoteServiceAdmin remoteServiceAdmin) {
        this.rsaSet.add(remoteServiceAdmin);
        this.importPossibilities.keySet().forEach(this::synchronizeImportsAsync);
    }

    public void remove(RemoteServiceAdmin remoteServiceAdmin) {
        this.rsaSet.remove(remoteServiceAdmin);
    }

    public void remoteAdminEvent(RemoteServiceAdminEvent remoteServiceAdminEvent) {
        ImportReference importReference = remoteServiceAdminEvent.getImportReference();
        if (remoteServiceAdminEvent.getType() != 4 || importReference == null) {
            return;
        }
        this.importedServices.allValues().stream().filter(importRegistration -> {
            return importReference.equals(importRegistration.getImportReference());
        }).forEach(this::unimportRegistration);
    }

    private void synchronizeImportsAsync(String str) {
        LOG.debug("Import of a service for filter {} was queued", str);
        if (this.rsaSet.isEmpty()) {
            return;
        }
        this.execService.execute(() -> {
            synchronizeImports(str);
        });
    }

    private void synchronizeImports(String str) {
        try {
            ImportDiff importDiff = new ImportDiff(this.importPossibilities.get(str), this.importedServices.get(str));
            importDiff.getRemoved().forEach(this::unimportRegistration);
            importDiff.getAdded().flatMap(this::importService).forEach(importRegistration -> {
                this.importedServices.put(str, importRegistration);
            });
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private Stream<ImportRegistration> importService(EndpointDescription endpointDescription) {
        Iterator<RemoteServiceAdmin> it = this.rsaSet.iterator();
        while (it.hasNext()) {
            ImportRegistration importService = it.next().importService(endpointDescription);
            if (importService != null) {
                if (importService.getException() == null) {
                    LOG.debug("Service import was successful {}", importService);
                    return Stream.of(importService);
                }
                LOG.info("Error importing service " + endpointDescription, importService.getException());
            }
        }
        return Stream.empty();
    }

    private void unimportRegistration(ImportRegistration importRegistration) {
        this.importedServices.remove(importRegistration);
        importRegistration.close();
    }

    public void endpointChanged(EndpointEvent endpointEvent, String str) {
        if (this.stopped) {
            return;
        }
        EndpointDescription endpoint = endpointEvent.getEndpoint();
        LOG.debug("Endpoint event received type {}, filter {}, endpoint {}", new Object[]{Integer.valueOf(endpointEvent.getType()), str, endpoint});
        switch (endpointEvent.getType()) {
            case 1:
                this.importPossibilities.put(str, endpoint);
                break;
            case 2:
            case 8:
                this.importPossibilities.remove(str, endpoint);
                break;
            case 4:
                this.importPossibilities.remove(str, endpoint);
                this.importPossibilities.put(str, endpoint);
                break;
        }
        synchronizeImportsAsync(str);
    }
}
