package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.class */
public class ForeignJoinSubscriptionProcessorSupplierTest {
    final Map<String, ValueAndTimestamp<String>> fks = Collections.singletonMap("fk1", ValueAndTimestamp.make("foo", 1));
    final KTableValueGetterSupplier<String, String> valueGetterSupplier = valueGetterSupplier(this.fks);
    final Processor<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>, String, SubscriptionResponseWrapper<String>> processor = processor(this.valueGetterSupplier);

    @Test
    public void shouldDetectVersionChange() {
        Assert.assertEquals(1L, 1L);
    }

    @Test
    public void shouldDeleteKeyAndPropagateFKV0() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        this.processor.init(mockProcessorContext);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk1", (byte) 0, (Integer) null);
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded = mockProcessorContext.forwarded();
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), (Object) null, (Integer) null), 1L), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldDeleteKeyAndPropagateFKV1() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        this.processor.init(mockProcessorContext);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk1", (byte) 1, 12);
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded = mockProcessorContext.forwarded();
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), (Object) null, 12), 1L), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateOnlyIfFKAvailableV0() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        this.processor.init(mockProcessorContext);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, "pk1", (byte) 0, (Integer) null);
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded = mockProcessorContext.forwarded();
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), "foo", (Integer) null), 1L), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateOnlyIfFKAvailableV1() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        this.processor.init(mockProcessorContext);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, "pk1", (byte) 1, 12);
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded = mockProcessorContext.forwarded();
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), "foo", 12), 1L), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateNullIfNoFKAvailableV0() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        this.processor.init(mockProcessorContext);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk1", (byte) 0, (Integer) null);
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded = mockProcessorContext.forwarded();
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), "foo", (Integer) null), 1L), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
        this.processor.process(new Record(new CombinedKey("fk9000", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded2 = mockProcessorContext.forwarded();
        Assert.assertEquals(2L, forwarded2.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), (Object) null, (Integer) null), 1L), ((MockProcessorContext.CapturedForward) forwarded2.get(1)).record());
    }

    @Test
    public void shouldPropagateNullIfNoFKAvailableV1() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        this.processor.init(mockProcessorContext);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk1", (byte) 1, 12);
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded = mockProcessorContext.forwarded();
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), "foo", 12), 1L), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
        this.processor.process(new Record(new CombinedKey("fk9000", "pk1"), new Change(ValueAndTimestamp.make(subscriptionWrapper, 1L), (Object) null), 1L));
        List forwarded2 = mockProcessorContext.forwarded();
        Assert.assertEquals(2L, forwarded2.size());
        Assert.assertEquals(new Record("pk1", new SubscriptionResponseWrapper(subscriptionWrapper.getHash(), (Object) null, 12), 1L), ((MockProcessorContext.CapturedForward) forwarded2.get(1)).record());
    }

    @Test
    public void shouldDeleteKeyNoPropagateV0() {
        this.processor.init(new MockProcessorContext());
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, "pk1", (byte) 0, (Integer) null), 1L), (Object) null), 1L));
        Assert.assertEquals(0L, r0.forwarded().size());
    }

    @Test
    public void shouldDeleteKeyNoPropagateV1() {
        this.processor.init(new MockProcessorContext());
        this.processor.process(new Record(new CombinedKey("fk1", "pk1"), new Change(ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1}, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, "pk1", (byte) 1, 12), 1L), (Object) null), 1L));
        Assert.assertEquals(0L, r0.forwarded().size());
    }

    private KTableValueGetterSupplier<String, String> valueGetterSupplier(final Map<String, ValueAndTimestamp<String>> map) {
        final KTableValueGetter<String, String> kTableValueGetter = new KTableValueGetter<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionProcessorSupplierTest.1
            public ValueAndTimestamp<String> get(String str) {
                return (ValueAndTimestamp) map.get(str);
            }

            public void init(ProcessorContext processorContext) {
            }
        };
        return new KTableValueGetterSupplier<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionProcessorSupplierTest.2
            public KTableValueGetter<String, String> get() {
                return kTableValueGetter;
            }

            public String[] storeNames() {
                return new String[0];
            }
        };
    }

    private Processor<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>, String, SubscriptionResponseWrapper<String>> processor(KTableValueGetterSupplier<String, String> kTableValueGetterSupplier) {
        return new SubscriptionJoinForeignProcessorSupplier(kTableValueGetterSupplier).get();
    }
}
