package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.class */
public class SubscriptionReceiveProcessorSupplierTest {
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private File stateDir;
    private MockInternalNewProcessorContext<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> context;
    private static final String FK = "fk1";
    private static final String PK1 = "pk1";
    private static final String PK2 = "pk2";
    private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () -> {
        return "pk-topic";
    };
    private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA = new CombinedKeySchema<>(() -> {
        return "fk-topic";
    }, Serdes.String(), PK_SERDE_TOPIC_SUPPLIER, Serdes.String());

    @Before
    public void before() {
        this.stateDir = TestUtils.tempDirectory();
        this.context = new MockInternalNewProcessorContext<>(this.props, new TaskId(0, 0), this.stateDir);
    }

    @After
    public void after() throws IOException {
        Utils.delete(this.stateDir);
    }

    @Test
    public void shouldDetectVersionChange() {
        Assert.assertEquals(1L, 1L);
    }

    @Test
    public void shouldDeleteKeyAndPropagateV0() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, PK2, (byte) 0, (Integer) null), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, PK1, (byte) 0, (Integer) null);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertNull(build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldDeleteKeyAndPropagateV1() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, PK2, (byte) 1, 1), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, PK1, (byte) 1, 1);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertNull(build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldDeleteKeyNoPropagateV0() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, PK2, (byte) 0, (Integer) null), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, PK1, (byte) 0, (Integer) null);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertNull(build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldDeleteKeyNoPropagateV1() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, PK2, (byte) 1, 1), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, PK1, (byte) 1, 1);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertNull(build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateOnlyIfFKValAvailableV0() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, PK2, (byte) 0, (Integer) null), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, PK1, (byte) 0, (Integer) null);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertEquals(make2, build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateOnlyIfFKValAvailableV1() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, PK2, (byte) 1, 1), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, PK1, (byte) 1, 1);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertEquals(make2, build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateNullIfNoFKValAvailableV0() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, PK2, (byte) 0, (Integer) null), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, PK1, (byte) 0, (Integer) null);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertEquals(make2, build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    @Test
    public void shouldPropagateNullIfNoFKValAvailableV1() {
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        Processor processor = supplier(storeBuilder).get();
        TimestampedKeyValueStore build = storeBuilder.build();
        this.context.addStateStore(build);
        build.init(this.context, build);
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, PK2, (byte) 1, 1), 0L);
        Bytes bytes = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
        build.put(bytes, make);
        processor.init(this.context);
        SubscriptionWrapper subscriptionWrapper = new SubscriptionWrapper(new long[]{1, 2}, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, PK1, (byte) 1, 1);
        ValueAndTimestamp make2 = ValueAndTimestamp.make(subscriptionWrapper, 1L);
        Record record = new Record(FK, subscriptionWrapper, 1L);
        processor.process(record);
        List forwarded = this.context.forwarded();
        Assert.assertEquals(make2, build.get(bytes));
        Assert.assertEquals(1L, forwarded.size());
        Assert.assertEquals(record.withKey(new CombinedKey(FK, PK1)).withValue(new Change(make2, make)), ((MockProcessorContext.CapturedForward) forwarded.get(0)).record());
    }

    private SubscriptionReceiveProcessorSupplier<String, String> supplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder) {
        return new SubscriptionReceiveProcessorSupplier<>(storeBuilder, COMBINED_KEY_SCHEMA);
    }

    private StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder() {
        return Stores.timestampedKeyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore("Store"), new Serdes.BytesSerde(), new SubscriptionWrapperSerde(PK_SERDE_TOPIC_SUPPLIER, Serdes.String()));
    }
}
