package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.1.1.jar:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.class */
public class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KStreamKStreamJoin.class);
    private final String otherWindowName;
    private final long joinBeforeMs;
    private final long joinAfterMs;
    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
    private final boolean outer;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.1.1.jar:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin$KStreamKStreamJoinProcessor.class */
    private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
        private WindowStore<K, V2> otherWindow;
        private StreamsMetricsImpl metrics;

        private KStreamKStreamJoinProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.metrics = (StreamsMetricsImpl) processorContext.metrics();
            this.otherWindow = (WindowStore) processorContext.getStateStore(KStreamKStreamJoin.this.otherWindowName);
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void process(K k, V1 v1) {
            if (k == null || v1 == null) {
                KStreamKStreamJoin.LOG.warn("Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", k, v1, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.metrics.skippedRecordsSensor().record();
                return;
            }
            boolean z = KStreamKStreamJoin.this.outer;
            WindowStoreIterator<V2> fetch = this.otherWindow.fetch((WindowStore<K, V2>) k, Math.max(0L, context().timestamp() - KStreamKStreamJoin.this.joinBeforeMs), Math.max(0L, context().timestamp() + KStreamKStreamJoin.this.joinAfterMs));
            Throwable th = null;
            while (fetch.hasNext()) {
                try {
                    try {
                        z = false;
                        context().forward(k, KStreamKStreamJoin.this.joiner.apply(v1, ((KeyValue) fetch.next()).value));
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (fetch != null) {
                        if (th != null) {
                            try {
                                fetch.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    throw th3;
                }
            }
            if (z) {
                context().forward(k, KStreamKStreamJoin.this.joiner.apply(v1, null));
            }
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamKStreamJoin(String str, long j, long j2, ValueJoiner<? super V1, ? super V2, ? extends R> valueJoiner, boolean z) {
        this.otherWindowName = str;
        this.joinBeforeMs = j;
        this.joinAfterMs = j2;
        this.joiner = valueJoiner;
        this.outer = z;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, V1> get() {
        return new KStreamKStreamJoinProcessor();
    }
}
