package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.class */
public class ResponseJoinProcessorSupplierTest {
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final ValueJoiner<String, String, String> JOINER = (str, str2) -> {
        return "(" + str + "," + str2 + ")";
    };

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest$TestKTableValueGetterSupplier.class */
    private static class TestKTableValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
        private final Map<K, V> map;

        private TestKTableValueGetterSupplier() {
            this.map = new HashMap();
        }

        public KTableValueGetter<K, V> get() {
            return new KTableValueGetter<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplierTest.TestKTableValueGetterSupplier.1
                public void init(ProcessorContext<?, ?> processorContext) {
                }

                public ValueAndTimestamp<V> get(K k) {
                    return ValueAndTimestamp.make(TestKTableValueGetterSupplier.this.map.get(k), -1L);
                }

                public boolean isVersioned() {
                    return false;
                }
            };
        }

        public String[] storeNames() {
            return new String[0];
        }

        void put(K k, V v) {
            this.map.put(k, v);
        }
    }

    @Test
    public void shouldNotForwardWhenHashDoesNotMatch() {
        TestKTableValueGetterSupplier testKTableValueGetterSupplier = new TestKTableValueGetterSupplier();
        Processor processor = new ResponseJoinProcessorSupplier(testKTableValueGetterSupplier, STRING_SERIALIZER, () -> {
            return "value-hash-dummy-topic";
        }, JOINER, false).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        mockProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        testKTableValueGetterSupplier.put("lhs1", "lhsValue");
        processor.process(new Record("lhs1", new SubscriptionResponseWrapper(Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue")), "rhsValue", 0), 0L));
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), IsEmptyCollection.empty());
    }

    @Test
    public void shouldIgnoreUpdateWhenLeftHasBecomeNull() {
        TestKTableValueGetterSupplier testKTableValueGetterSupplier = new TestKTableValueGetterSupplier();
        Processor processor = new ResponseJoinProcessorSupplier(testKTableValueGetterSupplier, STRING_SERIALIZER, () -> {
            return "value-hash-dummy-topic";
        }, JOINER, false).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        mockProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        testKTableValueGetterSupplier.put("lhs1", null);
        processor.process(new Record("lhs1", new SubscriptionResponseWrapper(Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue")), "rhsValue", 0), 0L));
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), IsEmptyCollection.empty());
    }

    @Test
    public void shouldForwardWhenHashMatches() {
        TestKTableValueGetterSupplier testKTableValueGetterSupplier = new TestKTableValueGetterSupplier();
        Processor processor = new ResponseJoinProcessorSupplier(testKTableValueGetterSupplier, STRING_SERIALIZER, () -> {
            return "value-hash-dummy-topic";
        }, JOINER, false).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        mockProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        testKTableValueGetterSupplier.put("lhs1", "lhsValue");
        processor.process(new Record("lhs1", new SubscriptionResponseWrapper(Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue")), "rhsValue", 0), 0L));
        List forwarded = mockProcessorContext.forwarded();
        MatcherAssert.assertThat(Integer.valueOf(forwarded.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) forwarded.get(0)).record(), CoreMatchers.is(new Record("lhs1", "(lhsValue,rhsValue)", 0L)));
    }

    @Test
    public void shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
        TestKTableValueGetterSupplier testKTableValueGetterSupplier = new TestKTableValueGetterSupplier();
        Processor processor = new ResponseJoinProcessorSupplier(testKTableValueGetterSupplier, STRING_SERIALIZER, () -> {
            return "value-hash-dummy-topic";
        }, JOINER, false).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        mockProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        testKTableValueGetterSupplier.put("lhs1", "lhsValue");
        processor.process(new Record("lhs1", new SubscriptionResponseWrapper(Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue")), (Object) null, 0), 0L));
        List forwarded = mockProcessorContext.forwarded();
        MatcherAssert.assertThat(Integer.valueOf(forwarded.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) forwarded.get(0)).record(), CoreMatchers.is(new Record("lhs1", (Object) null, 0L)));
    }

    @Test
    public void shouldEmitResultForLeftJoinWhenRightIsNull() {
        TestKTableValueGetterSupplier testKTableValueGetterSupplier = new TestKTableValueGetterSupplier();
        Processor processor = new ResponseJoinProcessorSupplier(testKTableValueGetterSupplier, STRING_SERIALIZER, () -> {
            return "value-hash-dummy-topic";
        }, JOINER, true).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        mockProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        testKTableValueGetterSupplier.put("lhs1", "lhsValue");
        processor.process(new Record("lhs1", new SubscriptionResponseWrapper(Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue")), (Object) null, 0), 0L));
        List forwarded = mockProcessorContext.forwarded();
        MatcherAssert.assertThat(Integer.valueOf(forwarded.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) forwarded.get(0)).record(), CoreMatchers.is(new Record("lhs1", "(lhsValue,null)", 0L)));
    }

    @Test
    public void shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
        TestKTableValueGetterSupplier testKTableValueGetterSupplier = new TestKTableValueGetterSupplier();
        Processor processor = new ResponseJoinProcessorSupplier(testKTableValueGetterSupplier, STRING_SERIALIZER, () -> {
            return "value-hash-dummy-topic";
        }, JOINER, true).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        mockProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        testKTableValueGetterSupplier.put("lhs1", null);
        processor.process(new Record("lhs1", new SubscriptionResponseWrapper((long[]) null, (Object) null, 0), 0L));
        List forwarded = mockProcessorContext.forwarded();
        MatcherAssert.assertThat(Integer.valueOf(forwarded.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) forwarded.get(0)).record(), CoreMatchers.is(new Record("lhs1", (Object) null, 0L)));
    }
}
