package org.apache.rya.streams.kafka.processors.join;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.rya.api.function.join.IterativeJoin;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.class */
public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier {
    private final String stateStoreName;
    private final IterativeJoin join;
    private final List<String> joinVars;
    private final List<String> allVars;

    @DefaultAnnotation({NonNull.class})
    /* loaded from: input_file:org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier$JoinProcessor.class */
    public static class JoinProcessor extends RyaStreamsProcessor {
        private static final Logger log = LoggerFactory.getLogger(JoinProcessor.class);
        private final String stateStoreName;
        private final IterativeJoin join;
        private final List<String> joinVars;
        private final List<String> allVars;
        private final ProcessorResultFactory resultFactory;
        private ProcessorContext context;
        private JoinStateStore joinStateStore;

        public JoinProcessor(String str, IterativeJoin iterativeJoin, List<String> list, List<String> list2, ProcessorResultFactory processorResultFactory) {
            super(processorResultFactory);
            this.stateStoreName = (String) Objects.requireNonNull(str);
            this.join = (IterativeJoin) Objects.requireNonNull(iterativeJoin);
            this.joinVars = (List) Objects.requireNonNull(list);
            this.allVars = (List) Objects.requireNonNull(list2);
            this.resultFactory = (ProcessorResultFactory) Objects.requireNonNull(processorResultFactory);
            if (!list2.subList(0, list.size()).equals(list)) {
                throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. Join Vars: " + list + ", All Vars: " + list2);
            }
        }

        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
            this.joinStateStore = new KeyValueJoinStateStore(processorContext.getStateStore(this.stateStoreName), this.joinVars, this.allVars);
        }

        public void process(Object obj, ProcessorResult processorResult) {
            log.debug("\nINPUT:\nSide: {}\nBinding Set: {}", processorResult.getBinary().getSide(), processorResult.getBinary().getResult());
            ProcessorResult.BinaryResult binary = processorResult.getBinary();
            this.joinStateStore.store(binary);
            try {
                CloseableIterator<VisibilityBindingSet> joinedValues = this.joinStateStore.getJoinedValues(binary);
                Throwable th = null;
                try {
                    try {
                        Iterator newLeftResult = binary.getSide() == ProcessorResult.BinaryResult.Side.LEFT ? this.join.newLeftResult(binary.getResult(), joinedValues) : this.join.newRightResult(joinedValues, binary.getResult());
                        while (newLeftResult.hasNext()) {
                            VisibilityBindingSet visibilityBindingSet = (VisibilityBindingSet) newLeftResult.next();
                            ProcessorResult make = this.resultFactory.make(visibilityBindingSet);
                            log.debug("\nOUTPUT:\n{}", visibilityBindingSet);
                            this.context.forward(obj, make);
                        }
                        if (joinedValues != null) {
                            if (0 != 0) {
                                try {
                                    joinedValues.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                joinedValues.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                log.error("Problem encountered while iterating over the other side's values within the state store.", e);
                throw new RuntimeException("Problem encountered while iterating over the other side's values within the state store.", e);
            }
        }

        public void punctuate(long j) {
        }

        public void close() {
        }
    }

    public JoinProcessorSupplier(String str, IterativeJoin iterativeJoin, List<String> list, List<String> list2, ProcessorResultFactory processorResultFactory) throws IllegalArgumentException {
        super(processorResultFactory);
        this.stateStoreName = (String) Objects.requireNonNull(str);
        this.join = (IterativeJoin) Objects.requireNonNull(iterativeJoin);
        this.joinVars = (List) Objects.requireNonNull(list);
        this.allVars = (List) Objects.requireNonNull(list2);
        if (!list2.subList(0, list.size()).equals(list)) {
            throw new IllegalArgumentException("The allVars list must start with the joinVars list, but it did not. Join Vars: " + list + ", All Vars: " + list2);
        }
    }

    public Processor<Object, ProcessorResult> get() {
        return new JoinProcessor(this.stateStoreName, this.join, this.joinVars, this.allVars, super.getResultFactory());
    }
}
