package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.4.0.jar:org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.class */
public class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KTableKTableOuterJoin.class);

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.4.0.jar:org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.class */
    private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {
        private final KTableValueGetter<K, V2> valueGetter;
        private StreamsMetricsImpl metrics;
        private Sensor skippedRecordsSensor;

        KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> kTableValueGetter) {
            this.valueGetter = kTableValueGetter;
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.metrics = (StreamsMetricsImpl) processorContext.metrics();
            this.skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), this.metrics);
            this.valueGetter.init(processorContext);
        }

        public void process(K k, Change<V1> change) {
            long max;
            if (k == null) {
                KTableKTableOuterJoin.LOG.warn("Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]", change, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.skippedRecordsSensor.record();
                return;
            }
            R r = null;
            R r2 = null;
            ValueAndTimestamp<V2> valueAndTimestamp = this.valueGetter.get(k);
            Object valueOrNull = ValueAndTimestamp.getValueOrNull(valueAndTimestamp);
            if (valueOrNull != null) {
                max = Math.max(context().timestamp(), valueAndTimestamp.timestamp());
            } else if (change.newValue == null && change.oldValue == null) {
                return;
            } else {
                max = context().timestamp();
            }
            if (valueOrNull != null || change.newValue != null) {
                r = KTableKTableOuterJoin.this.joiner.apply(change.newValue, valueOrNull);
            }
            if (KTableKTableOuterJoin.this.sendOldValues && (valueOrNull != null || change.oldValue != null)) {
                r2 = KTableKTableOuterJoin.this.joiner.apply(change.oldValue, valueOrNull);
            }
            context().forward((ProcessorContext) k, (K) new Change(r, r2), To.all().withTimestamp(max));
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void close() {
            this.valueGetter.close();
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((KTableKTableOuterJoinProcessor) obj, (Change) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.4.0.jar:org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin$KTableKTableOuterJoinValueGetter.class */
    private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> {
        private final KTableValueGetter<K, V1> valueGetter1;
        private final KTableValueGetter<K, V2> valueGetter2;

        KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> kTableValueGetter, KTableValueGetter<K, V2> kTableValueGetter2) {
            this.valueGetter1 = kTableValueGetter;
            this.valueGetter2 = kTableValueGetter2;
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public void init(ProcessorContext processorContext) {
            this.valueGetter1.init(processorContext);
            this.valueGetter2.init(processorContext);
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public ValueAndTimestamp<R> get(K k) {
            V1 value;
            long timestamp;
            V2 value2;
            long timestamp2;
            R r = null;
            ValueAndTimestamp<V1> valueAndTimestamp = this.valueGetter1.get(k);
            if (valueAndTimestamp == null) {
                value = null;
                timestamp = -1;
            } else {
                value = valueAndTimestamp.value();
                timestamp = valueAndTimestamp.timestamp();
            }
            ValueAndTimestamp<V2> valueAndTimestamp2 = this.valueGetter2.get(k);
            if (valueAndTimestamp2 == null) {
                value2 = null;
                timestamp2 = -1;
            } else {
                value2 = valueAndTimestamp2.value();
                timestamp2 = valueAndTimestamp2.timestamp();
            }
            if (value != null || value2 != null) {
                r = KTableKTableOuterJoin.this.joiner.apply(value, value2);
            }
            return ValueAndTimestamp.make(r, Math.max(timestamp, timestamp2));
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public void close() {
            this.valueGetter1.close();
            this.valueGetter2.close();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.4.0.jar:org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin$KTableKTableOuterJoinValueGetterSupplier.class */
    private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
        KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> kTableValueGetterSupplier, KTableValueGetterSupplier<K, V2> kTableValueGetterSupplier2) {
            super(kTableValueGetterSupplier, kTableValueGetterSupplier2);
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
        public KTableValueGetter<K, R> get() {
            return new KTableKTableOuterJoinValueGetter(this.valueGetterSupplier1.get(), this.valueGetterSupplier2.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KTableKTableOuterJoin(KTableImpl<K, ?, V1> kTableImpl, KTableImpl<K, ?, V2> kTableImpl2, ValueJoiner<? super V1, ? super V2, ? extends R> valueJoiner) {
        super(kTableImpl, kTableImpl2, valueJoiner);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, Change<V1>> get() {
        return new KTableKTableOuterJoinProcessor(this.valueGetterSupplier2.get());
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public KTableValueGetterSupplier<K, R> view() {
        return new KTableKTableOuterJoinValueGetterSupplier(this.valueGetterSupplier1, this.valueGetterSupplier2);
    }
}
