package com.github.cherrythefatbunny.reactive.dubbo.extensions.proxyfactory;

import com.github.cherrythefatbunny.reactive.dubbo.extensions.Constants;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.proxy.InvokerInvocationHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/github/cherrythefatbunny/reactive/dubbo/extensions/proxyfactory/ReactiveInvokerInvocationHandler.class */
public class ReactiveInvokerInvocationHandler extends InvokerInvocationHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveInvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public ReactiveInvokerInvocationHandler(Invoker<?> invoker) {
        super(invoker);
        this.invoker = invoker;
    }

    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        Class<?> returnType = method.getReturnType();
        if (!Publisher.class.isAssignableFrom(returnType)) {
            return super.invoke(obj, method, objArr);
        }
        RpcInvocation createInvocation = createInvocation(method, objArr);
        if (Mono.class.isAssignableFrom(returnType)) {
            createInvocation.setAttachment(Constants.KEY_PUBLISHER_TYPE, Constants.VALUE_PUBLISHER_MONO);
            return Mono.create(monoSink -> {
                try {
                    ((CompletableFuture) this.invoker.invoke(createInvocation).recreate()).whenComplete((obj2, th) -> {
                        if (th != null) {
                            monoSink.error(th);
                        } else {
                            monoSink.success(obj2);
                        }
                    });
                } catch (Throwable th2) {
                    if (LOGGER.isErrorEnabled()) {
                        LOGGER.error("mono invocation", th2);
                    }
                    monoSink.error(th2);
                }
            });
        }
        if (!Flux.class.isAssignableFrom(returnType)) {
            throw new IllegalArgumentException(String.format("%s not supported now", method.getReturnType().getSimpleName()));
        }
        createInvocation.setAttachment(Constants.KEY_PUBLISHER_TYPE, Constants.VALUE_PUBLISHER_FLUX);
        return Flux.create(fluxSink -> {
            try {
                ((CompletableFuture) this.invoker.invoke(createInvocation).recreate()).whenComplete((obj2, th) -> {
                    if (th != null) {
                        fluxSink.error(th);
                        return;
                    }
                    if (!(obj2 instanceof List)) {
                        fluxSink.error(new IllegalArgumentException("unexpected return type:" + obj2.getClass()));
                        return;
                    }
                    List list = (List) obj2;
                    if (list != null) {
                        fluxSink.getClass();
                        list.forEach(fluxSink::next);
                    }
                    fluxSink.complete();
                });
            } catch (Throwable th2) {
                if (LOGGER.isErrorEnabled()) {
                    LOGGER.error("flux invocation", th2);
                }
                fluxSink.error(th2);
            }
        });
    }

    protected RpcInvocation createInvocation(Method method, Object[] objArr) {
        RpcInvocation rpcInvocation = new RpcInvocation(method, objArr);
        rpcInvocation.setAttachment("future_returntype", "true");
        rpcInvocation.setAttachment("async", "true");
        return rpcInvocation;
    }
}
