package org.apache.rya.streams.kafka.processors.filter;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.rya.api.function.filter.FilterEvaluator;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/processors/filter/FilterProcessorSupplier.class */
public class FilterProcessorSupplier extends RyaStreamsProcessorSupplier {
    private static final Logger log = LoggerFactory.getLogger(FilterProcessorSupplier.class);
    private final FilterEvaluator filter;

    @DefaultAnnotation({NonNull.class})
    /* loaded from: input_file:org/apache/rya/streams/kafka/processors/filter/FilterProcessorSupplier$FilterProcessor.class */
    public static class FilterProcessor extends RyaStreamsProcessor {
        private final FilterEvaluator filter;
        private ProcessorContext context;

        public FilterProcessor(FilterEvaluator filterEvaluator, ProcessorResultFactory processorResultFactory) {
            super(processorResultFactory);
            this.filter = (FilterEvaluator) Objects.requireNonNull(filterEvaluator);
        }

        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
        }

        public void process(Object obj, ProcessorResult processorResult) {
            if (processorResult.getType() != ProcessorResult.ResultType.UNARY) {
                throw new RuntimeException("The ProcessorResult to be processed must be Unary.");
            }
            VisibilityBindingSet result = processorResult.getUnary().getResult();
            FilterProcessorSupplier.log.debug("\nINPUT:\n{}", result);
            if (this.filter.filter(result)) {
                FilterProcessorSupplier.log.debug("\nOUTPUT:\n{}", result);
                this.context.forward(obj, super.getResultFactory().make(result));
            }
        }

        public void punctuate(long j) {
        }

        public void close() {
        }
    }

    public FilterProcessorSupplier(FilterEvaluator filterEvaluator, ProcessorResultFactory processorResultFactory) {
        super(processorResultFactory);
        this.filter = (FilterEvaluator) Objects.requireNonNull(filterEvaluator);
    }

    public Processor<Object, ProcessorResult> get() {
        return new FilterProcessor(this.filter, super.getResultFactory());
    }
}
