package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSourceTest.class */
public class KTableSourceTest {
    private final Serializer<String> strSerializer = new StringSerializer();
    private final Deserializer<String> strDeserializer = new StringDeserializer();

    @Test
    public void testKTable() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTable table = kStreamBuilder.table(this.strSerializer, this.strSerializer, this.strDeserializer, this.strDeserializer, "topic1");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        table.toStream().process(mockProcessorSupplier, new String[0]);
        KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder);
        kStreamTestDriver.process("topic1", "A", 1);
        kStreamTestDriver.process("topic1", "B", 2);
        kStreamTestDriver.process("topic1", "C", 3);
        kStreamTestDriver.process("topic1", "D", 4);
        kStreamTestDriver.process("topic1", "A", null);
        kStreamTestDriver.process("topic1", "B", null);
        Assert.assertEquals(Utils.mkList(new String[]{"A:1", "B:2", "C:3", "D:4", "A:null", "B:null"}), mockProcessorSupplier.processed);
    }

    @Test
    public void testValueGetter() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            KTableValueGetterSupplier valueGetterSupplier = kStreamBuilder.table(this.strSerializer, this.strSerializer, this.strDeserializer, this.strDeserializer, "topic1").valueGetterSupplier();
            KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder, file, null, null, null, null);
            KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
            kTableValueGetter.init(kStreamTestDriver.context());
            kStreamTestDriver.process("topic1", "A", "01");
            kStreamTestDriver.process("topic1", "B", "01");
            kStreamTestDriver.process("topic1", "C", "01");
            Assert.assertEquals("01", kTableValueGetter.get("A"));
            Assert.assertEquals("01", kTableValueGetter.get("B"));
            Assert.assertEquals("01", kTableValueGetter.get("C"));
            kStreamTestDriver.process("topic1", "A", "02");
            kStreamTestDriver.process("topic1", "B", "02");
            Assert.assertEquals("02", kTableValueGetter.get("A"));
            Assert.assertEquals("02", kTableValueGetter.get("B"));
            Assert.assertEquals("01", kTableValueGetter.get("C"));
            kStreamTestDriver.process("topic1", "A", "03");
            Assert.assertEquals("03", kTableValueGetter.get("A"));
            Assert.assertEquals("02", kTableValueGetter.get("B"));
            Assert.assertEquals("01", kTableValueGetter.get("C"));
            kStreamTestDriver.process("topic1", "A", null);
            kStreamTestDriver.process("topic1", "B", null);
            Assert.assertNull(kTableValueGetter.get("A"));
            Assert.assertNull(kTableValueGetter.get("B"));
            Assert.assertEquals("01", kTableValueGetter.get("C"));
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }

    @Test
    public void testNotSedingOldValue() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            KTableImpl table = kStreamBuilder.table(this.strSerializer, this.strSerializer, this.strDeserializer, this.strDeserializer, "topic1");
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            kStreamBuilder.addProcessor("proc1", mockProcessorSupplier, new String[]{table.name});
            KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder, file, null, null, null, null);
            kStreamTestDriver.process("topic1", "A", "01");
            kStreamTestDriver.process("topic1", "B", "01");
            kStreamTestDriver.process("topic1", "C", "01");
            mockProcessorSupplier.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
            kStreamTestDriver.process("topic1", "A", "02");
            kStreamTestDriver.process("topic1", "B", "02");
            mockProcessorSupplier.checkAndClearResult("A:(02<-null)", "B:(02<-null)");
            kStreamTestDriver.process("topic1", "A", "03");
            mockProcessorSupplier.checkAndClearResult("A:(03<-null)");
            kStreamTestDriver.process("topic1", "A", null);
            kStreamTestDriver.process("topic1", "B", null);
            mockProcessorSupplier.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }

    @Test
    public void testSedingOldValue() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            KTableImpl table = kStreamBuilder.table(this.strSerializer, this.strSerializer, this.strDeserializer, this.strDeserializer, "topic1");
            table.enableSendingOldValues();
            Assert.assertTrue(table.sendingOldValueEnabled());
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            kStreamBuilder.addProcessor("proc1", mockProcessorSupplier, new String[]{table.name});
            KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder, file, null, null, null, null);
            kStreamTestDriver.process("topic1", "A", "01");
            kStreamTestDriver.process("topic1", "B", "01");
            kStreamTestDriver.process("topic1", "C", "01");
            mockProcessorSupplier.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
            kStreamTestDriver.process("topic1", "A", "02");
            kStreamTestDriver.process("topic1", "B", "02");
            mockProcessorSupplier.checkAndClearResult("A:(02<-01)", "B:(02<-01)");
            kStreamTestDriver.process("topic1", "A", "03");
            mockProcessorSupplier.checkAndClearResult("A:(03<-02)");
            kStreamTestDriver.process("topic1", "A", null);
            kStreamTestDriver.process("topic1", "B", null);
            mockProcessorSupplier.checkAndClearResult("A:(null<-03)", "B:(null<-02)");
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }
}
