/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KStreamKStreamJoin<K, R, V1, V2>
implements ProcessorSupplier<K, V1> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
    private final String otherWindowName;
    private final long joinBeforeMs;
    private final long joinAfterMs;
    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
    private final boolean outer;

    KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<? super V1, ? super V2, ? extends R> joiner, boolean outer) {
        this.otherWindowName = otherWindowName;
        this.joinBeforeMs = joinBeforeMs;
        this.joinAfterMs = joinAfterMs;
        this.joiner = joiner;
        this.outer = outer;
    }

    @Override
    public Processor<K, V1> get() {
        return new KStreamKStreamJoinProcessor();
    }

    private class KStreamKStreamJoinProcessor
    extends AbstractProcessor<K, V1> {
        private WindowStore<K, V2> otherWindow;
        private StreamsMetricsImpl metrics;
        private Sensor droppedRecordsSensor;

        private KStreamKStreamJoinProcessor() {
        }

        @Override
        public void init(ProcessorContext context) {
            super.init(context);
            this.metrics = (StreamsMetricsImpl)context.metrics();
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), this.metrics);
            this.otherWindow = (WindowStore)context.getStateStore(KStreamKStreamJoin.this.otherWindowName);
        }

        @Override
        public void process(K key, V1 value) {
            if (key == null || value == null) {
                LOG.warn("Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", new Object[]{key, value, this.context().topic(), this.context().partition(), this.context().offset()});
                this.droppedRecordsSensor.record();
                return;
            }
            boolean needOuterJoin = KStreamKStreamJoin.this.outer;
            long inputRecordTimestamp = this.context().timestamp();
            long timeFrom = Math.max(0L, inputRecordTimestamp - KStreamKStreamJoin.this.joinBeforeMs);
            long timeTo = Math.max(0L, inputRecordTimestamp + KStreamKStreamJoin.this.joinAfterMs);
            try (WindowStoreIterator iter = this.otherWindow.fetch(key, timeFrom, timeTo);){
                while (iter.hasNext()) {
                    needOuterJoin = false;
                    KeyValue otherRecord = (KeyValue)iter.next();
                    this.context().forward(key, KStreamKStreamJoin.this.joiner.apply(value, otherRecord.value), To.all().withTimestamp(Math.max(inputRecordTimestamp, (Long)otherRecord.key)));
                }
                if (needOuterJoin) {
                    this.context().forward(key, KStreamKStreamJoin.this.joiner.apply(value, null));
                }
            }
        }
    }
}

