package org.apache.rya.streams.kafka.topology;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.rya.api.function.filter.FilterEvaluator;
import org.apache.rya.api.function.join.IterativeJoin;
import org.apache.rya.api.function.join.LeftOuterJoin;
import org.apache.rya.api.function.join.NaturalJoin;
import org.apache.rya.api.function.projection.BNodeIdFactory;
import org.apache.rya.api.function.projection.MultiProjectionEvaluator;
import org.apache.rya.api.function.projection.ProjectionEvaluator;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier;
import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier;
import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
import org.apache.rya.streams.kafka.processors.output.StatementOutputFormatterSupplier;
import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.BinaryTupleOperator;
import org.openrdf.query.algebra.Extension;
import org.openrdf.query.algebra.Filter;
import org.openrdf.query.algebra.Group;
import org.openrdf.query.algebra.Join;
import org.openrdf.query.algebra.LeftJoin;
import org.openrdf.query.algebra.MultiProjection;
import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.Reduced;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/topology/TopologyFactory.class */
public class TopologyFactory implements TopologyBuilderFactory {
    private static final String SOURCE = "SOURCE";
    private static final String STATEMENT_PATTERN_PREFIX = "SP_";
    private static final String JOIN_PREFIX = "JOIN_";
    private static final String PROJECTION_PREFIX = "PROJECTION_";
    private static final String FILTER_PREFIX = "FILTER_";
    private static final String AGGREGATION_PREFIX = "AGGREGATION_";
    private static final String SINK = "SINK";
    private List<ProcessorEntry> processorEntryList;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rya/streams/kafka/topology/TopologyFactory$ProcessorEntry.class */
    public static final class ProcessorEntry {
        private final TupleExpr node;
        private final String id;
        private final Optional<ProcessorResult.BinaryResult.Side> downstreamSide;
        private final ProcessorSupplier<?, ?> supplier;
        private final List<TupleExpr> upstreamNodes;

        public ProcessorEntry(TupleExpr tupleExpr, String str, Optional<ProcessorResult.BinaryResult.Side> optional, ProcessorSupplier<?, ?> processorSupplier, List<TupleExpr> list) {
            this.node = (TupleExpr) Objects.requireNonNull(tupleExpr);
            this.id = (String) Objects.requireNonNull(str);
            this.downstreamSide = (Optional) Objects.requireNonNull(optional);
            this.supplier = (ProcessorSupplier) Objects.requireNonNull(processorSupplier);
            this.upstreamNodes = (List) Objects.requireNonNull(list);
        }

        public TupleExpr getNode() {
            return this.node;
        }

        public Optional<ProcessorResult.BinaryResult.Side> getDownstreamSide() {
            return this.downstreamSide;
        }

        public List<TupleExpr> getUpstreamNodes() {
            return this.upstreamNodes;
        }

        public String getID() {
            return this.id;
        }

        public ProcessorSupplier<?, ?> getSupplier() {
            return this.supplier;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof ProcessorEntry)) {
                return false;
            }
            ProcessorEntry processorEntry = (ProcessorEntry) obj;
            return Objects.equals(this.node, processorEntry.node) && Objects.equals(this.id, processorEntry.id) && Objects.equals(this.downstreamSide, processorEntry.downstreamSide) && Objects.equals(this.supplier, processorEntry.supplier) && Objects.equals(this.upstreamNodes, processorEntry.upstreamNodes);
        }

        public int hashCode() {
            return Objects.hash(this.node, this.downstreamSide, this.upstreamNodes, this.id, this.supplier);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("ID: " + this.id + "\n");
            if (this.downstreamSide.isPresent()) {
                sb.append("***********************************\n");
                sb.append("SIDE: " + this.downstreamSide.get() + "\n");
            }
            sb.append("***********************************\n");
            sb.append("PARENTS: ");
            Iterator<TupleExpr> it = this.upstreamNodes.iterator();
            while (it.hasNext()) {
                sb.append(it.next().toString() + ",");
            }
            sb.append("\n***********************************\n");
            sb.append("NODE: " + this.node.toString());
            sb.append("\n");
            return sb.toString();
        }
    }

    /* loaded from: input_file:org/apache/rya/streams/kafka/topology/TopologyFactory$QueryVisitor.class */
    static final class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderFactory.TopologyBuilderException> {
        private final List<ProcessorEntry> entries = new ArrayList();
        private final Map<TupleExpr, String> idMap = new HashMap();
        private SinkEntry<?, ?> sinkEntry = new SinkEntry<>(new BindingSetOutputFormatterSupplier(), new StringSerializer(), new VisibilityBindingSetSerializer());
        private final BNodeIdFactory bNodeIdFactory;

        public QueryVisitor(BNodeIdFactory bNodeIdFactory) {
            this.bNodeIdFactory = (BNodeIdFactory) Objects.requireNonNull(bNodeIdFactory);
        }

        public List<ProcessorEntry> getProcessorEntryList() {
            return this.entries;
        }

        public Map<TupleExpr, String> getIDs() {
            return this.idMap;
        }

        public SinkEntry<?, ?> getSinkEntry() {
            return this.sinkEntry;
        }

        public void meet(Reduced reduced) throws TopologyBuilderFactory.TopologyBuilderException {
            this.sinkEntry = new SinkEntry<>(new StatementOutputFormatterSupplier(), new StringSerializer(), new VisibilityStatementSerializer());
            super.meet(reduced);
        }

        public void meet(StatementPattern statementPattern) throws TopologyBuilderFactory.TopologyBuilderException {
            String str = TopologyFactory.STATEMENT_PATTERN_PREFIX + UUID.randomUUID();
            Optional<ProcessorResult.BinaryResult.Side> side = getSide(statementPattern);
            this.entries.add(new ProcessorEntry(statementPattern, str, side, new StatementPatternProcessorSupplier(statementPattern, visibilityBindingSet -> {
                return getResult(side, visibilityBindingSet);
            }), Lists.newArrayList()));
            this.idMap.put(statementPattern, str);
            super.meet(statementPattern);
        }

        public void meet(Projection projection) throws TopologyBuilderFactory.TopologyBuilderException {
            String str = TopologyFactory.PROJECTION_PREFIX + UUID.randomUUID();
            Optional<ProcessorResult.BinaryResult.Side> side = getSide(projection);
            TupleExpr arg = projection.getArg();
            if (arg instanceof Extension) {
                arg = ((Extension) arg).getArg();
            }
            this.entries.add(new ProcessorEntry(projection, str, side, new ProjectionProcessorSupplier(ProjectionEvaluator.make(projection), visibilityBindingSet -> {
                return getResult(side, visibilityBindingSet);
            }), Lists.newArrayList(new TupleExpr[]{arg})));
            this.idMap.put(projection, str);
            super.meet(projection);
        }

        public void meet(MultiProjection multiProjection) throws TopologyBuilderFactory.TopologyBuilderException {
            String str = TopologyFactory.PROJECTION_PREFIX + UUID.randomUUID();
            Optional<ProcessorResult.BinaryResult.Side> side = getSide(multiProjection);
            MultiProjectionProcessorSupplier multiProjectionProcessorSupplier = new MultiProjectionProcessorSupplier(MultiProjectionEvaluator.make(multiProjection, this.bNodeIdFactory), visibilityBindingSet -> {
                return getResult(side, visibilityBindingSet);
            });
            TupleExpr arg = multiProjection.getArg();
            if (arg instanceof Extension) {
                arg = ((Extension) arg).getArg();
            }
            this.entries.add(new ProcessorEntry(multiProjection, str, side, multiProjectionProcessorSupplier, Lists.newArrayList(new TupleExpr[]{arg})));
            this.idMap.put(multiProjection, str);
            super.meet(multiProjection);
        }

        public void meet(Filter filter) throws TopologyBuilderFactory.TopologyBuilderException {
            String str = TopologyFactory.FILTER_PREFIX + UUID.randomUUID();
            Optional<ProcessorResult.BinaryResult.Side> side = getSide(filter);
            this.entries.add(new ProcessorEntry(filter, str, side, new FilterProcessorSupplier(FilterEvaluator.make(filter), visibilityBindingSet -> {
                return getResult(side, visibilityBindingSet);
            }), Lists.newArrayList(new TupleExpr[]{filter.getArg()})));
            this.idMap.put(filter, str);
            super.meet(filter);
        }

        public void meet(Join join) throws TopologyBuilderFactory.TopologyBuilderException {
            meetJoin(TopologyFactory.JOIN_PREFIX + UUID.randomUUID(), new NaturalJoin(), join);
            super.meet(join);
        }

        public void meet(LeftJoin leftJoin) throws TopologyBuilderFactory.TopologyBuilderException {
            meetJoin(TopologyFactory.JOIN_PREFIX + UUID.randomUUID(), new LeftOuterJoin(), leftJoin);
            super.meet(leftJoin);
        }

        public void meet(Group group) throws TopologyBuilderFactory.TopologyBuilderException {
            String str = TopologyFactory.AGGREGATION_PREFIX + UUID.randomUUID();
            Optional<ProcessorResult.BinaryResult.Side> side = getSide(group);
            this.entries.add(new ProcessorEntry(group, str, side, new AggregationProcessorSupplier(str, group, visibilityBindingSet -> {
                return getResult(side, visibilityBindingSet);
            }), Lists.newArrayList(new TupleExpr[]{group.getArg()})));
            this.idMap.put(group, str);
            super.meet(group);
        }

        private Optional<ProcessorResult.BinaryResult.Side> getSide(QueryModelNode queryModelNode) {
            return queryModelNode.getParentNode() instanceof BinaryTupleOperator ? queryModelNode.equals(queryModelNode.getParentNode().getLeftArg()) ? Optional.of(ProcessorResult.BinaryResult.Side.LEFT) : Optional.of(ProcessorResult.BinaryResult.Side.RIGHT) : Optional.empty();
        }

        private void meetJoin(String str, IterativeJoin iterativeJoin, BinaryTupleOperator binaryTupleOperator) {
            Set bindingNames = binaryTupleOperator.getLeftArg().getBindingNames();
            Set bindingNames2 = binaryTupleOperator.getRightArg().getBindingNames();
            ArrayList newArrayList = Lists.newArrayList(Sets.intersection(bindingNames, bindingNames2));
            bindingNames.removeAll(newArrayList);
            bindingNames2.removeAll(newArrayList);
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(bindingNames);
            arrayList.addAll(bindingNames2);
            newArrayList.sort(Comparator.naturalOrder());
            arrayList.sort(Comparator.naturalOrder());
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(newArrayList);
            arrayList2.addAll(arrayList);
            Optional<ProcessorResult.BinaryResult.Side> side = getSide(binaryTupleOperator);
            this.entries.add(new ProcessorEntry(binaryTupleOperator, str, side, new JoinProcessorSupplier(str, iterativeJoin, newArrayList, arrayList2, visibilityBindingSet -> {
                return getResult(side, visibilityBindingSet);
            }), Lists.newArrayList(new TupleExpr[]{binaryTupleOperator.getLeftArg(), binaryTupleOperator.getRightArg()})));
            this.idMap.put(binaryTupleOperator, str);
        }

        private ProcessorResult getResult(Optional<ProcessorResult.BinaryResult.Side> optional, VisibilityBindingSet visibilityBindingSet) {
            return optional.isPresent() ? ProcessorResult.make(new ProcessorResult.BinaryResult(optional.get(), visibilityBindingSet)) : ProcessorResult.make(new ProcessorResult.UnaryResult(visibilityBindingSet));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rya/streams/kafka/topology/TopologyFactory$SinkEntry.class */
    public static final class SinkEntry<K, V> {
        private final ProcessorSupplier<Object, ProcessorResult> formatterSupplier;
        private final Serializer<K> keySerializer;
        private final Serializer<V> valueSerializer;

        public SinkEntry(ProcessorSupplier<Object, ProcessorResult> processorSupplier, Serializer<K> serializer, Serializer<V> serializer2) {
            this.keySerializer = (Serializer) Objects.requireNonNull(serializer);
            this.valueSerializer = (Serializer) Objects.requireNonNull(serializer2);
            this.formatterSupplier = (ProcessorSupplier) Objects.requireNonNull(processorSupplier);
        }

        public ProcessorSupplier<Object, ProcessorResult> getFormatterSupplier() {
            return this.formatterSupplier;
        }

        public Serializer<K> getKeySerializer() {
            return this.keySerializer;
        }

        public Serializer<V> getValueSerializer() {
            return this.valueSerializer;
        }
    }

    @Override // org.apache.rya.streams.kafka.topology.TopologyBuilderFactory
    public TopologyBuilder build(String str, String str2, String str3, BNodeIdFactory bNodeIdFactory) throws MalformedQueryException, TopologyBuilderFactory.TopologyBuilderException {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        ParsedQuery parseQuery = new SPARQLParser().parseQuery(str, (String) null);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        TupleExpr tupleExpr = parseQuery.getTupleExpr();
        QueryVisitor queryVisitor = new QueryVisitor(bNodeIdFactory);
        tupleExpr.visit(queryVisitor);
        this.processorEntryList = queryVisitor.getProcessorEntryList();
        Map<TupleExpr, String> iDs = queryVisitor.getIDs();
        topologyBuilder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), new String[]{str2});
        ProcessorEntry processorEntry = null;
        for (int size = this.processorEntryList.size() - 1; size >= 0; size--) {
            processorEntry = this.processorEntryList.get(size);
            if (processorEntry.getNode() instanceof StatementPattern) {
                topologyBuilder.addProcessor(processorEntry.getID(), processorEntry.getSupplier(), new String[]{SOURCE});
            } else {
                List<TupleExpr> upstreamNodes = processorEntry.getUpstreamNodes();
                String[] strArr = new String[upstreamNodes.size()];
                for (int i = 0; i < upstreamNodes.size(); i++) {
                    strArr[i] = iDs.get(upstreamNodes.get(i));
                }
                topologyBuilder.addProcessor(processorEntry.getID(), processorEntry.getSupplier(), strArr);
            }
            if ((processorEntry.getNode() instanceof Join) || (processorEntry.getNode() instanceof LeftJoin) || (processorEntry.getNode() instanceof Group)) {
                topologyBuilder.addStateStore(Stores.create(processorEntry.getID()).withStringKeys().withValues(new VisibilityBindingSetSerde()).persistent().build(), new String[]{processorEntry.getID()});
            }
        }
        SinkEntry<?, ?> sinkEntry = queryVisitor.getSinkEntry();
        topologyBuilder.addProcessor("OUTPUT_FORMATTER", sinkEntry.getFormatterSupplier(), new String[]{processorEntry.getID()});
        topologyBuilder.addSink(SINK, str3, sinkEntry.getKeySerializer(), sinkEntry.getValueSerializer(), new String[]{"OUTPUT_FORMATTER"});
        return topologyBuilder;
    }

    @VisibleForTesting
    public List<ProcessorEntry> getProcessorEntry() {
        return this.processorEntryList;
    }
}
