package io.scalecube.services;

import io.scalecube.transport.Message;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import rx.Observable;
import rx.Subscription;

/* loaded from: input_file:io/scalecube/services/ServiceDispatcher.class */
public class ServiceDispatcher {
    private final ServiceCommunicator sender;
    private final ServiceRegistry registry;
    private final Subscriptions subscriptions;

    public ServiceDispatcher(Microservices microservices) {
        this.sender = microservices.sender();
        this.registry = microservices.serviceRegistry();
        this.subscriptions = new Subscriptions(microservices);
        this.sender.listen().filter(message -> {
            return Boolean.valueOf(ServiceHeaders.serviceRequest(message) != null);
        }).subscribe(this::onServiceRequest);
    }

    private void onServiceRequest(Message message) {
        Optional<ServiceInstance> localInstance = this.registry.getLocalInstance(ServiceHeaders.serviceRequest(message), ServiceHeaders.serviceMethod(message));
        DispatchingFuture from = DispatchingFuture.from(this.sender, message);
        try {
            if (localInstance.isPresent() && (localInstance.get() instanceof LocalServiceInstance)) {
                Method method = ((LocalServiceInstance) localInstance.get()).getMethod(message);
                if (method.getReturnType().equals(CompletableFuture.class)) {
                    from.complete(localInstance.get().invoke(message));
                } else if (method.getReturnType().equals(Observable.class) && !this.subscriptions.contains(message.correlationId())) {
                    String correlationId = message.correlationId();
                    Subscription subscribe = localInstance.get().listen(message).doOnCompleted(() -> {
                        this.subscriptions.unsubscribe(correlationId);
                    }).doOnTerminate(() -> {
                        this.subscriptions.unsubscribe(correlationId);
                    }).subscribe(message2 -> {
                        this.sender.send(message.sender(), message2);
                    });
                    if (!subscribe.isUnsubscribed()) {
                        this.subscriptions.put(message.correlationId(), new ServiceSubscription(correlationId, subscribe, this.sender.cluster().member().id()));
                    }
                }
            } else {
                from.completeExceptionally(new IllegalStateException("Service instance is was not found: " + message.qualifier()));
            }
        } catch (Exception e) {
            from.completeExceptionally(e);
        }
    }
}
