package io.rsocket.ipc;

import io.opentracing.Tracer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ipc.MetadataDecoder;
import io.rsocket.ipc.decoders.CompositeMetadataDecoder;
import io.rsocket.ipc.exception.RouteNotFound;
import io.rsocket.ipc.util.IPCChannelFunction;
import io.rsocket.ipc.util.IPCFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/ipc/RoutingServerRSocket.class */
public class RoutingServerRSocket implements RSocket {
    final Router router;
    final MetadataDecoder decoder;

    public RoutingServerRSocket(Router router) {
        this(new CompositeMetadataDecoder(), router);
    }

    public RoutingServerRSocket(Tracer tracer, Router router) {
        this(new CompositeMetadataDecoder(tracer), router);
    }

    public RoutingServerRSocket(MetadataDecoder metadataDecoder, Router router) {
        this.decoder = metadataDecoder;
        this.router = router;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            MetadataDecoder.Metadata decode = this.decoder.decode(payload.sliceMetadata());
            String route = decode.route();
            IPCFunction<Mono<Void>> routeFireAndForget = this.router.routeFireAndForget(route);
            if (routeFireAndForget == null) {
                return Mono.error(new RouteNotFound("Nothing found for route " + route));
            }
            Mono<Void> apply = routeFireAndForget.apply(payload, decode);
            payload.release();
            return apply;
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            MetadataDecoder.Metadata decode = this.decoder.decode(payload.sliceMetadata());
            String route = decode.route();
            IPCFunction<Mono<Payload>> routeRequestResponse = this.router.routeRequestResponse(route);
            if (routeRequestResponse == null) {
                return Mono.error(new NullPointerException("nothing found for route " + route));
            }
            Mono<Payload> apply = routeRequestResponse.apply(payload, decode);
            payload.release();
            return apply;
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            MetadataDecoder.Metadata decode = this.decoder.decode(payload.sliceMetadata());
            String route = decode.route();
            IPCFunction<Flux<Payload>> routeRequestStream = this.router.routeRequestStream(route);
            if (routeRequestStream == null) {
                return Flux.error(new NullPointerException("nothing found for route " + route));
            }
            Flux<Payload> apply = routeRequestStream.apply(payload, decode);
            payload.release();
            return apply;
        } catch (Throwable th) {
            payload.release();
            return Flux.error(th);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Payload payload = (Payload) signal.get();
            return payload != null ? doRequestChannel(payload, flux) : flux;
        });
    }

    private Flux<Payload> doRequestChannel(Payload payload, Flux<Payload> flux) {
        try {
            MetadataDecoder.Metadata decode = this.decoder.decode(payload.sliceMetadata());
            String route = decode.route();
            IPCChannelFunction routeRequestChannel = this.router.routeRequestChannel(route);
            if (routeRequestChannel != null) {
                return routeRequestChannel.apply(flux, payload, decode);
            }
            payload.release();
            return Flux.error(new NullPointerException("nothing found for route " + route));
        } catch (Throwable th) {
            payload.release();
            return Flux.error(th);
        }
    }
}
