package io.reactivex.netty;

import io.reactivex.netty.RemoteRxEvent;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.codec.Encoder;
import io.reactivex.netty.ingress.IngressPolicy;
import io.reactivex.netty.slotting.SlotAssignment;
import io.reactivex.netty.slotting.SlotValuePair;
import io.reactivex.netty.slotting.SlottingStrategy;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/RemoteObservableConnectionHandler.class */
public class RemoteObservableConnectionHandler implements ConnectionHandler<RemoteRxEvent, RemoteRxEvent> {
    private static final Logger logger = LoggerFactory.getLogger(RemoteObservableConnectionHandler.class);
    private Map<String, RemoteObservableConfiguration> observables;
    private CountDownLatch blockUntilCompleted;
    private ServerMetrics serverMetrics;
    private IngressPolicy ingressPolicy;

    public RemoteObservableConnectionHandler(Map<String, RemoteObservableConfiguration> map, IngressPolicy ingressPolicy, CountDownLatch countDownLatch, ServerMetrics serverMetrics) {
        this.observables = map;
        this.ingressPolicy = ingressPolicy;
        this.blockUntilCompleted = countDownLatch;
        this.serverMetrics = serverMetrics;
    }

    @Override // io.reactivex.netty.channel.ConnectionHandler
    public Observable<Void> handle(ObservableConnection<RemoteRxEvent, RemoteRxEvent> observableConnection) {
        logger.debug("Connection received: " + observableConnection.toString());
        return this.ingressPolicy.allowed(observableConnection) ? setupConnection(observableConnection) : Observable.error(new RemoteObservableException("Connection rejected due to ingress policy"));
    }

    private <T> void subscribe(MutableReference<Subscription> mutableReference, final RemoteRxEvent remoteRxEvent, final ObservableConnection<RemoteRxEvent, RemoteRxEvent> observableConnection, RemoteObservableConfiguration<T> remoteObservableConfiguration, final SlotAssignment slotAssignment) {
        Observable<T> observable = remoteObservableConfiguration.getObservable();
        Func1<Map<String, String>, Func1<T, Boolean>> filterFunction = remoteObservableConfiguration.getFilterFunction();
        Func1<SlotValuePair<T>, Boolean> slottingFunction = remoteObservableConfiguration.getSlottingStrategy().slottingFunction();
        final Encoder<T> encoder = remoteObservableConfiguration.getEncoder();
        mutableReference.setValue(observable.filter(filterFunction.call(remoteRxEvent.getSubscribeParameters())).map(new Func1<T, SlotValuePair<T>>() { // from class: io.reactivex.netty.RemoteObservableConnectionHandler.3
            @Override // rx.functions.Func1
            public SlotValuePair<T> call(T t) {
                return new SlotValuePair<>(slotAssignment.getSlotAssignment().intValue(), t);
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // rx.functions.Func1
            public /* bridge */ /* synthetic */ Object call(Object obj) {
                return call((AnonymousClass3<T>) obj);
            }
        }).filter(slottingFunction).map(new Func1<SlotValuePair<T>, T>() { // from class: io.reactivex.netty.RemoteObservableConnectionHandler.2
            @Override // rx.functions.Func1
            public T call(SlotValuePair<T> slotValuePair) {
                return slotValuePair.getValue();
            }
        }).subscribe(new Observer<T>() { // from class: io.reactivex.netty.RemoteObservableConnectionHandler.1
            @Override // rx.Observer
            public void onCompleted() {
                observableConnection.writeAndFlush(RemoteRxEvent.completed(remoteRxEvent.getName()));
                RemoteObservableConnectionHandler.this.blockUntilCompleted.countDown();
                RemoteObservableConnectionHandler.this.serverMetrics.incrementCompletedCount();
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                observableConnection.writeAndFlush(RemoteRxEvent.error(remoteRxEvent.getName(), RemoteObservable.fromThrowableToBytes(th)));
                RemoteObservableConnectionHandler.this.serverMetrics.incrementErrorCount();
            }

            @Override // rx.Observer
            public void onNext(T t) {
                observableConnection.writeAndFlush(RemoteRxEvent.next(remoteRxEvent.getName(), encoder.encode(t)));
                RemoteObservableConnectionHandler.this.serverMetrics.incrementNextCount();
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Void> handleSubscribeRequest(RemoteRxEvent remoteRxEvent, ObservableConnection<RemoteRxEvent, RemoteRxEvent> observableConnection, MutableReference<SlottingStrategy> mutableReference, MutableReference<Subscription> mutableReference2) {
        String name = remoteRxEvent.getName();
        RemoteObservableConfiguration remoteObservableConfiguration = this.observables.get(name);
        if (remoteObservableConfiguration == null) {
            return Observable.error(new RemoteObservableException("No remote observable configuration found for name: " + name));
        }
        if (remoteRxEvent.getType() == RemoteRxEvent.Type.subscribed) {
            SlottingStrategy slottingStrategy = remoteObservableConfiguration.getSlottingStrategy();
            mutableReference.setValue(slottingStrategy);
            SlotAssignment assignSlot = slottingStrategy.assignSlot(observableConnection);
            if (!assignSlot.isAssigned()) {
                return Observable.error(new RemoteObservableException("Slot could not be assigned for connection."));
            }
            subscribe(mutableReference2, remoteRxEvent, observableConnection, remoteObservableConfiguration, assignSlot);
            this.serverMetrics.incrementSubscribedCount();
            logger.debug("Connection: " + observableConnection.toString() + " subscribed to observable: " + name);
        }
        return Observable.empty();
    }

    private Observable<Void> setupConnection(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> observableConnection) {
        final MutableReference mutableReference = new MutableReference();
        final MutableReference mutableReference2 = new MutableReference();
        return observableConnection.getInput().filter(new Func1<RemoteRxEvent, Boolean>() { // from class: io.reactivex.netty.RemoteObservableConnectionHandler.6
            @Override // rx.functions.Func1
            public Boolean call(RemoteRxEvent remoteRxEvent) {
                boolean z = false;
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.subscribed || remoteRxEvent.getType() == RemoteRxEvent.Type.unsubscribed) {
                    z = true;
                }
                return Boolean.valueOf(z);
            }
        }).flatMap(new Func1<RemoteRxEvent, Observable<Void>>() { // from class: io.reactivex.netty.RemoteObservableConnectionHandler.5
            @Override // rx.functions.Func1
            public Observable<Void> call(RemoteRxEvent remoteRxEvent) {
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.subscribed) {
                    return RemoteObservableConnectionHandler.this.handleSubscribeRequest(remoteRxEvent, observableConnection, mutableReference2, mutableReference);
                }
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.unsubscribed) {
                    Subscription subscription = (Subscription) mutableReference.getValue();
                    if (subscription != null) {
                        subscription.unsubscribe();
                    }
                    RemoteObservableConnectionHandler.this.releaseSlot((SlottingStrategy) mutableReference2.getValue(), observableConnection);
                    RemoteObservableConnectionHandler.this.serverMetrics.incrementUnsubscribedCount();
                    RemoteObservableConnectionHandler.logger.debug("Connection: " + observableConnection.toString() + " unsubscribed");
                }
                return Observable.empty();
            }
        }).doOnCompleted(new Action0() { // from class: io.reactivex.netty.RemoteObservableConnectionHandler.4
            @Override // rx.functions.Action0
            public void call() {
                RemoteObservableConnectionHandler.this.releaseSlot((SlottingStrategy) mutableReference2.getValue(), observableConnection);
                RemoteObservableConnectionHandler.logger.debug("Connection: " + observableConnection.toString() + " closed");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseSlot(SlottingStrategy slottingStrategy, ObservableConnection<RemoteRxEvent, RemoteRxEvent> observableConnection) {
        if (slottingStrategy != null) {
            slottingStrategy.releaseSlot(observableConnection);
        }
    }
}
