package com.github.cherrythefatbunny.reactive.dubbo.extensions.proxyfactory;

import com.github.cherrythefatbunny.reactive.dubbo.extensions.Constants;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.proxy.AbstractProxyInvoker;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/github/cherrythefatbunny/reactive/dubbo/extensions/proxyfactory/AbstractReactiveProxyInvoker.class */
public abstract class AbstractReactiveProxyInvoker<T> extends AbstractProxyInvoker<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractReactiveProxyInvoker.class);
    protected final T proxy;
    private final Class<T> type;
    private final URL url;

    public AbstractReactiveProxyInvoker(T t, Class<T> cls, URL url) {
        super(t, cls, url);
        this.proxy = t;
        this.type = cls;
        this.url = url;
    }

    public Result invoke(Invocation invocation) throws RpcException {
        String attachment = invocation.getAttachment(Constants.KEY_PUBLISHER_TYPE);
        if (StringUtils.isBlank(attachment)) {
            return super.invoke(invocation);
        }
        RpcContext context = RpcContext.getContext();
        try {
            Object doInvoke = doInvoke(this.proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            Mono mono = null;
            if (attachment.equals(Constants.VALUE_PUBLISHER_MONO)) {
                mono = (Mono) doInvoke;
            } else if (attachment.equals(Constants.VALUE_PUBLISHER_FLUX)) {
                mono = ((Flux) doInvoke).collect(ArrayList::new, (v0, v1) -> {
                    v0.add(v1);
                });
            }
            if (mono != null) {
                return new AsyncRpcResult(mono.toFuture());
            }
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new IllegalArgumentException("unexpected publisher type:" + attachment));
            return new AsyncRpcResult(completableFuture);
        } catch (InvocationTargetException e) {
            if (context.isAsyncStarted() && !context.stopAsync()) {
                LOGGER.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return new RpcResult(e.getTargetException());
        } catch (Throwable th) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + th.getMessage(), th);
        }
    }
}
