package org.apache.apex.malhar.stream.api.impl;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.DagMeta;
import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.joda.time.Duration;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.class */
public class ApexStreamImpl<T> implements ApexStream<T> {
    private static Set<Attribute<?>> OPERATOR_ATTRIBUTES = new HashSet();
    private static Set<Attribute<?>> DAG_ATTRIBUTES = new HashSet();
    private static Set<Attribute<?>> INPUT_ATTRIBUTES = new HashSet();
    private static Set<Attribute<?>> OUTPUT_ATTRIBUTES = new HashSet();
    protected DagMeta graph;
    protected Brick<T> lastBrick;

    /* loaded from: input_file:org/apache/apex/malhar/stream/api/impl/ApexStreamImpl$Brick.class */
    public static class Brick<T> {
        private Operator.OutputPort<T> lastOutput;
        private DagMeta.NodeMeta nodeMeta;
        private Pair<Operator.OutputPort, Operator.InputPort> lastStream;

        public Operator.OutputPort<T> getLastOutput() {
            return this.lastOutput;
        }

        public void setLastOutput(Operator.OutputPort<T> outputPort) {
            this.lastOutput = outputPort;
        }

        public void setLastStream(Pair<Operator.OutputPort, Operator.InputPort> pair) {
            this.lastStream = pair;
        }

        public Pair<Operator.OutputPort, Operator.InputPort> getLastStream() {
            return this.lastStream;
        }
    }

    public Brick<T> getLastBrick() {
        return this.lastBrick;
    }

    public void setLastBrick(Brick<T> brick) {
        this.lastBrick = brick;
    }

    public ApexStreamImpl() {
        this.graph = new DagMeta();
    }

    public ApexStreamImpl(ApexStreamImpl<T> apexStreamImpl) {
        this.graph = apexStreamImpl.graph;
        this.lastBrick = apexStreamImpl.lastBrick;
    }

    public ApexStreamImpl(DagMeta dagMeta) {
        this(dagMeta, null);
    }

    public ApexStreamImpl(DagMeta dagMeta, Brick<T> brick) {
        this.graph = dagMeta;
        this.lastBrick = brick;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction, Option... optionArr) {
        FunctionOperator.MapFunctionOperator mapFunctionOperator = new FunctionOperator.MapFunctionOperator(mapFunction);
        return (STREAM) addOperator(mapFunctionOperator, mapFunctionOperator.input, mapFunctionOperator.output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatMapFunction, Option... optionArr) {
        FunctionOperator.FlatMapFunctionOperator flatMapFunctionOperator = new FunctionOperator.FlatMapFunctionOperator(flatMapFunction);
        return (STREAM) addOperator(flatMapFunctionOperator, flatMapFunctionOperator.input, flatMapFunctionOperator.output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filterFunction, Option... optionArr) {
        FunctionOperator.FilterFunctionOperator filterFunctionOperator = new FunctionOperator.FilterFunctionOperator(filterFunction);
        return (STREAM) addOperator(filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output, optionArr);
    }

    public <STREAM extends ApexStream<Map.Entry<Object, Integer>>> STREAM countByElement() {
        return null;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <O, STREAM extends ApexStream<O>> STREAM endWith(Operator operator, Operator.InputPort<T> inputPort, Option... optionArr) {
        return (STREAM) addOperator(operator, inputPort, null, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator operator, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... optionArr) {
        checkArguments(operator, inputPort, outputPort);
        DagMeta.NodeMeta addNode = this.lastBrick == null ? this.graph.addNode(operator, null, null, inputPort, optionArr) : this.graph.addNode(operator, ((Brick) this.lastBrick).nodeMeta, ((Brick) this.lastBrick).lastOutput, inputPort, optionArr);
        Brick<O> brick = new Brick<>();
        ((Brick) brick).nodeMeta = addNode;
        brick.setLastOutput(outputPort);
        if (this.lastBrick != null) {
            ((Brick) brick).lastStream = Pair.of(((Brick) this.lastBrick).lastOutput, inputPort);
        }
        if (getClass() == ApexStreamImpl.class || getClass() == ApexWindowedStreamImpl.class) {
            return newStream(this.graph, brick);
        }
        try {
            return (STREAM) getClass().getConstructor(ApexStreamImpl.class).newInstance(newStream(this.graph, brick));
        } catch (Exception e) {
            throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as default parameter", e);
        }
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform) {
        return compositeStreamTransform.compose(this);
    }

    private void checkArguments(Operator operator, Operator.InputPort inputPort, Operator.OutputPort outputPort) {
        if (operator == null) {
            throw new IllegalArgumentException("Operator can not be null");
        }
        boolean z = inputPort == null;
        boolean z2 = outputPort == null;
        for (Field field : operator.getClass().getFields()) {
            int modifiers = field.getModifiers();
            if (Modifier.isPublic(modifiers) && Modifier.isTransient(modifiers)) {
                Object obj = null;
                try {
                    obj = field.get(operator);
                } catch (IllegalAccessException e) {
                }
                if (obj == outputPort) {
                    z2 = true;
                }
                if (obj == inputPort) {
                    z = true;
                }
            }
        }
        if (!z || !z2) {
            throw new IllegalArgumentException("Input port " + inputPort + " and/or Output port " + outputPort + " is/are not owned by Operator " + operator);
        }
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public <STREAM extends ApexStream<T>> STREAM union(ApexStream<T>... apexStreamArr) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public ApexStreamImpl<T> print() {
        ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();
        addOperator(consoleOutputOperator, consoleOutputOperator.input, null, Option.Options.name(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass())));
        return this;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public ApexStream<T> printErr() {
        throw new UnsupportedOperationException();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public ApexStream<T> with(Attribute attribute, Object obj) {
        if (OPERATOR_ATTRIBUTES.contains(attribute)) {
            ((Brick) this.lastBrick).nodeMeta.operatorAttributes.add(Pair.of(attribute, obj));
        }
        if (INPUT_ATTRIBUTES.contains(attribute) && ((Brick) this.lastBrick).lastStream != null) {
            List<Pair<Attribute, Object>> list = ((Brick) this.lastBrick).nodeMeta.inputPortAttributes.get(((Brick) this.lastBrick).lastStream.getRight());
            if (list == null) {
                list = new LinkedList();
            }
            list.add(Pair.of(attribute, obj));
            ((Brick) this.lastBrick).nodeMeta.inputPortAttributes.put(((Brick) this.lastBrick).lastStream.getRight(), list);
        }
        if (OUTPUT_ATTRIBUTES.contains(attribute) && ((Brick) this.lastBrick).lastStream != null) {
            for (DagMeta.NodeMeta nodeMeta : ((Brick) this.lastBrick).nodeMeta.getParent()) {
                nodeMeta.getNodeStreams().containsKey(((Brick) this.lastBrick).lastStream.getLeft());
                List<Pair<Attribute, Object>> list2 = nodeMeta.outputPortAttributes.get(((Brick) this.lastBrick).lastStream.getLeft());
                if (list2 == null) {
                    list2 = new LinkedList();
                }
                list2.add(Pair.of(attribute, obj));
                ((Brick) this.lastBrick).nodeMeta.outputPortAttributes.put(((Brick) this.lastBrick).lastStream.getLeft(), list2);
            }
        }
        setGlobalAttribute(attribute, obj);
        return this;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public ApexStream<T> setGlobalAttribute(Attribute attribute, Object obj) {
        this.graph.dagAttributes.add(Pair.of(attribute, obj));
        return this;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public ApexStream<T> with(DAG.Locality locality) {
        if (((Brick) this.lastBrick).lastStream != null) {
            Iterator<DagMeta.NodeMeta> it = ((Brick) this.lastBrick).nodeMeta.getParent().iterator();
            while (it.hasNext()) {
                Pair<List<Operator.InputPort>, DAG.Locality> pair = it.next().getNodeStreams().get(((Brick) this.lastBrick).lastStream.getLeft());
                if (pair != null) {
                    pair.setValue(locality);
                }
            }
        }
        return this;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public ApexStream<T> with(String str, Object obj) {
        try {
            BeanUtils.setProperty(((Brick) this.lastBrick).nodeMeta.getOperator(), str, obj);
            return this;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public DAG createDag() {
        LogicalPlan logicalPlan = new LogicalPlan();
        populateDag(logicalPlan);
        return logicalPlan;
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public void populateDag(DAG dag) {
        this.graph.buildDAG(dag);
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public void runEmbedded(boolean z, long j, Callable<Boolean> callable) {
        LocalMode newInstance = LocalMode.newInstance();
        populateDag(newInstance.getDAG());
        newInstance.getDAG();
        StramLocalCluster controller = newInstance.getController();
        if (controller instanceof StramLocalCluster) {
            controller.setExitCondition(callable);
        }
        if (z) {
            controller.runAsync();
        } else if (j >= 0) {
            controller.run(j);
        } else {
            controller.run();
        }
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public void run() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public WindowedStream<T> window(WindowOption windowOption) {
        return window(windowOption, null, null);
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption) {
        return window(windowOption, triggerOption, null);
    }

    @Override // org.apache.apex.malhar.stream.api.ApexStream
    public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration duration) {
        ApexWindowedStreamImpl apexWindowedStreamImpl = new ApexWindowedStreamImpl();
        apexWindowedStreamImpl.lastBrick = this.lastBrick;
        apexWindowedStreamImpl.graph = this.graph;
        apexWindowedStreamImpl.windowOption = windowOption;
        apexWindowedStreamImpl.triggerOption = triggerOption;
        apexWindowedStreamImpl.allowedLateness = duration;
        return apexWindowedStreamImpl;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <O> ApexStream<O> newStream(DagMeta dagMeta, Brick<O> brick) {
        ApexStreamImpl apexStreamImpl = new ApexStreamImpl();
        apexStreamImpl.graph = dagMeta;
        apexStreamImpl.lastBrick = brick;
        return apexStreamImpl;
    }

    static {
        try {
            for (Field field : Context.OperatorContext.class.getDeclaredFields()) {
                if (field.getType() == Attribute.class) {
                    OPERATOR_ATTRIBUTES.add((Attribute) field.get(Context.OperatorContext.class));
                }
            }
            for (Field field2 : Context.DAGContext.class.getDeclaredFields()) {
                if (field2.getType() == Attribute.class) {
                    DAG_ATTRIBUTES.add((Attribute) field2.get(Context.DAGContext.class));
                }
            }
        } catch (IllegalAccessException e) {
        }
        INPUT_ATTRIBUTES.add(Context.PortContext.PARTITION_PARALLEL);
        INPUT_ATTRIBUTES.add(Context.PortContext.AUTO_RECORD);
        INPUT_ATTRIBUTES.add(Context.PortContext.STREAM_CODEC);
        INPUT_ATTRIBUTES.add(Context.PortContext.TUPLE_CLASS);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.QUEUE_CAPACITY);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.BUFFER_MEMORY_MB);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.SPIN_MILLIS);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.UNIFIER_SINGLE_FINAL);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.IS_OUTPUT_UNIFIED);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.AUTO_RECORD);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.STREAM_CODEC);
        OUTPUT_ATTRIBUTES.add(Context.PortContext.TUPLE_CLASS);
    }
}
