package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Objects;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.class */
public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO> implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> {
    private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;

    /* renamed from: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinForeignProcessorSupplier$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$streams$kstream$internals$foreignkeyjoin$SubscriptionWrapper$Instruction = new int[SubscriptionWrapper.Instruction.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$streams$kstream$internals$foreignkeyjoin$SubscriptionWrapper$Instruction[SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$kstream$internals$foreignkeyjoin$SubscriptionWrapper$Instruction[SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$kstream$internals$foreignkeyjoin$SubscriptionWrapper$Instruction[SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$kstream$internals$foreignkeyjoin$SubscriptionWrapper$Instruction[SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public SubscriptionJoinForeignProcessorSupplier(KTableValueGetterSupplier<KO, VO> kTableValueGetterSupplier) {
        this.foreignValueGetterSupplier = kTableValueGetterSupplier;
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> get() {
        return new ContextualProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>>() { // from class: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinForeignProcessorSupplier.1
            private KTableValueGetter<KO, VO> foreignValues;

            @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
            public void init(ProcessorContext<K, SubscriptionResponseWrapper<VO>> processorContext) {
                super.init(processorContext);
                this.foreignValues = SubscriptionJoinForeignProcessorSupplier.this.foreignValueGetterSupplier.get();
                this.foreignValues.init(processorContext);
            }

            @Override // org.apache.kafka.streams.processor.api.Processor
            public void process(Record<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> record) {
                Objects.requireNonNull(record.key(), "This processor should never see a null key.");
                Objects.requireNonNull(record.value(), "This processor should never see a null value.");
                ValueAndTimestamp<SubscriptionWrapper<K>> valueAndTimestamp = record.value().newValue;
                Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
                SubscriptionWrapper<K> value = valueAndTimestamp.value();
                if (value.getVersion() > 1) {
                    throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
                }
                ValueAndTimestamp<VO> valueAndTimestamp2 = this.foreignValues.get(record.key().getForeignKey());
                long timestamp = valueAndTimestamp2 == null ? valueAndTimestamp.timestamp() : Math.max(valueAndTimestamp.timestamp(), valueAndTimestamp2.timestamp());
                switch (AnonymousClass2.$SwitchMap$org$apache$kafka$streams$kstream$internals$foreignkeyjoin$SubscriptionWrapper$Instruction[value.getInstruction().ordinal()]) {
                    case 1:
                        context().forward(record.withKey(record.key().getPrimaryKey()).withValue(new SubscriptionResponseWrapper(value.getHash(), null, value.getPrimaryPartition())).withTimestamp(timestamp));
                        return;
                    case 2:
                        context().forward(record.withKey(record.key().getPrimaryKey()).withValue(new SubscriptionResponseWrapper(value.getHash(), valueAndTimestamp2 == null ? null : valueAndTimestamp2.value(), value.getPrimaryPartition())).withTimestamp(timestamp));
                        return;
                    case 3:
                        if (valueAndTimestamp2 != null) {
                            context().forward(record.withKey(record.key().getPrimaryKey()).withValue(new SubscriptionResponseWrapper(value.getHash(), valueAndTimestamp2.value(), value.getPrimaryPartition())).withTimestamp(timestamp));
                            return;
                        }
                        return;
                    case 4:
                        return;
                    default:
                        throw new IllegalStateException("Unhandled instruction: " + value.getInstruction());
                }
            }
        };
    }
}
