package io.dstream;

import io.dstream.DStreamInvocationChain;
import io.dstream.utils.Assert;
import io.dstream.utils.PropertiesHelper;
import io.dstream.utils.ReflectionUtils;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/dstream/DStreamExecutionGraphsBuilder.class */
public final class DStreamExecutionGraphsBuilder<T, R> {
    private final Logger logger = Logger.getLogger(getClass().getName());
    private final R targetStream;
    private final DStreamInvocationChain invocationPipeline;
    private final Set<String> streamOperationNames;
    private final Class<?> currentStreamType;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/dstream/DStreamExecutionGraphsBuilder$StreamInvocationChainSupplier.class */
    public interface StreamInvocationChainSupplier {
        DStreamInvocationChain get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/dstream/DStreamExecutionGraphsBuilder$StreamInvocationHandler.class */
    public class StreamInvocationHandler implements InvocationHandler {
        private StreamInvocationHandler() {
        }

        @Override // java.lang.reflect.InvocationHandler
        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            return DStreamExecutionGraphsBuilder.this.invoke(obj, method, objArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/dstream/DStreamExecutionGraphsBuilder$StreamNameMonitor.class */
    public static class StreamNameMonitor {
        private static final ThreadLocal<Set<String>> tl = ThreadLocal.withInitial(() -> {
            return new HashSet();
        });

        StreamNameMonitor() {
        }

        static void add(String str) {
            Assert.isFalse(tl.get().contains(str), "Stream with the name '" + str + "' already exists");
            tl.get().add(str);
        }

        static void reset() {
            tl.get().clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T, R> R as(Class<T> cls, String str, Class<R> cls2) {
        StreamNameMonitor.add(str);
        return new DStreamExecutionGraphsBuilder(cls, str, cls2).targetStream;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private DStreamExecutionGraphsBuilder(Class<?> cls, String str, Class<R>... clsArr) {
        this.currentStreamType = clsArr[0];
        this.targetStream = generateStreamProxy(clsArr);
        this.invocationPipeline = new DStreamInvocationChain(cls, str, clsArr[0]);
        this.streamOperationNames = ReflectionUtils.findAllVisibleMethodOnInterface(clsArr[0]);
        this.streamOperationNames.remove("ofType");
        this.streamOperationNames.remove("executeAs");
        this.streamOperationNames.remove("getName");
    }

    public String toString() {
        return this.invocationPipeline.getSourceIdentifier() + ":" + this.invocationPipeline.getInvocations().stream().map(dStreamInvocation -> {
            return dStreamInvocation.getMethod().getName();
        }).collect(Collectors.toList());
    }

    private R cloneTargetDistributable(Method method, Object[] objArr) {
        Ops valueOf = Ops.valueOf(method.getName());
        if (valueOf.equals(Ops.join) || valueOf.equals(Ops.union) || valueOf.equals(Ops.unionAll)) {
            objArr = new Object[]{((StreamInvocationChainSupplier) objArr[0]).get()};
        }
        Class<?> sourceElementType = this.invocationPipeline.getSourceElementType();
        String sourceIdentifier = this.invocationPipeline.getSourceIdentifier();
        Class[] clsArr = new Class[1];
        clsArr[0] = method.getReturnType().isInterface() ? method.getReturnType() : this.currentStreamType;
        DStreamExecutionGraphsBuilder dStreamExecutionGraphsBuilder = new DStreamExecutionGraphsBuilder(sourceElementType, sourceIdentifier, clsArr);
        dStreamExecutionGraphsBuilder.invocationPipeline.addAllInvocations(this.invocationPipeline.getInvocations());
        if (valueOf.equals(Ops.on)) {
            dStreamExecutionGraphsBuilder.invocationPipeline.getLastInvocation().setSupplementaryOperation(objArr[0]);
        } else {
            dStreamExecutionGraphsBuilder.invocationPipeline.addInvocation(new DStreamInvocationChain.DStreamInvocation(method, objArr));
        }
        return dStreamExecutionGraphsBuilder.targetStream;
    }

    private R generateStreamProxy(Class<?>... clsArr) {
        List list = (List) Stream.of((Object[]) clsArr).collect(Collectors.toList());
        list.add(StreamInvocationChainSupplier.class);
        return (R) Proxy.newProxyInstance(getClass().getClassLoader(), (Class[]) list.toArray(new Class[0]), new StreamInvocationHandler());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        Object invoke;
        String name = method.getName();
        if (this.streamOperationNames.contains(name)) {
            this.logger.info("Invoking OPERATION: " + name);
            invoke = cloneTargetDistributable(method, objArr == null ? new Object[0] : objArr);
        } else if (name.equals("getName")) {
            invoke = this.invocationPipeline.getSourceIdentifier();
        } else if (name.equals("get")) {
            invoke = this.invocationPipeline;
        } else if (name.equals("executeAs")) {
            StreamNameMonitor.reset();
            String str = (String) objArr[0];
            Assert.notEmpty(str, "'executionName' must not be null or empty");
            Properties loadProperties = PropertiesHelper.loadProperties(str + ".cfg");
            String property = loadProperties.getProperty(DStreamConstants.DELEGATE);
            Assert.notEmpty(property, "Execution delegate property is not provided in '" + str + ".cfg' (e.g., dstream.delegate=foo.bar.SomePipelineDelegate)");
            this.logger.info("Delegating execution to: " + property);
            invoke = ((DStreamExecutionDelegate) ReflectionUtils.newDefaultInstance(Class.forName(property, true, Thread.currentThread().getContextClassLoader()))).execute(str, loadProperties, new DStreamExecutionGraphBuilder(this.invocationPipeline, loadProperties).build());
        } else {
            invoke = method.invoke(this, objArr);
        }
        return invoke;
    }
}
