package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.class */
public class KTableTransformValuesTest {
    private static final String QUERYABLE_NAME = "queryable-store";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String STORE_NAME = "someStore";
    private static final String OTHER_STORE_NAME = "otherStore";
    private static final Consumed<String, String> CONSUMED = Consumed.with(Serdes.String(), Serdes.String());
    private TopologyTestDriver driver;
    private MockProcessorSupplier<String, String> capture;
    private StreamsBuilder builder;

    @Mock(MockType.NICE)
    private KTableImpl<String, String, String> parent;

    @Mock(MockType.NICE)
    private InternalProcessorContext context;

    @Mock(MockType.NICE)
    private KTableValueGetterSupplier<String, String> parentGetterSupplier;

    @Mock(MockType.NICE)
    private KTableValueGetter<String, String> parentGetter;

    @Mock(MockType.NICE)
    private TimestampedKeyValueStore<String, String> stateStore;

    @Mock(MockType.NICE)
    private ValueTransformerWithKeySupplier<String, String, String> mockSupplier;

    @Mock(MockType.NICE)
    private ValueTransformerWithKey<String, String, String> transformer;

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$ExclamationValueTransformer.class */
    public static class ExclamationValueTransformer implements ValueTransformerWithKey<Object, String, String> {
        private final List<String> expectedStoredNames;

        ExclamationValueTransformer(List<String> list) {
            this.expectedStoredNames = list;
        }

        public void init(ProcessorContext processorContext) {
            KTableTransformValuesTest.throwIfStoresNotAvailable(processorContext, this.expectedStoredNames);
        }

        public String transform(Object obj, String str) {
            return obj.toString() + "->" + str + "!";
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$ExclamationValueTransformerSupplier.class */
    public static class ExclamationValueTransformerSupplier implements ValueTransformerWithKeySupplier<Object, String, String> {
        private final List<String> expectedStoredNames;

        ExclamationValueTransformerSupplier(String... strArr) {
            this.expectedStoredNames = Arrays.asList(strArr);
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public ExclamationValueTransformer m82get() {
            return new ExclamationValueTransformer(this.expectedStoredNames);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$NullSupplier.class */
    private static class NullSupplier implements ValueTransformerWithKeySupplier<String, String, String> {
        private NullSupplier() {
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public ValueTransformerWithKey<String, String, String> m83get() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$StatefulTransformer.class */
    public static class StatefulTransformer implements ValueTransformerWithKey<String, String, Integer> {
        private int counter;

        private StatefulTransformer() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public Integer transform(String str, String str2) {
            int i = this.counter + 1;
            this.counter = i;
            return Integer.valueOf(i);
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$StatefulTransformerSupplier.class */
    private static class StatefulTransformerSupplier implements ValueTransformerWithKeySupplier<String, String, Integer> {
        private StatefulTransformerSupplier() {
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public ValueTransformerWithKey<String, String, Integer> m84get() {
            return new StatefulTransformer();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$StatelessTransformer.class */
    public static class StatelessTransformer implements ValueTransformerWithKey<String, String, Integer> {
        private StatelessTransformer() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public Integer transform(String str, String str2) {
            if (str2 == null) {
                return null;
            }
            return Integer.valueOf(str2.length());
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest$StatelessTransformerSupplier.class */
    private static class StatelessTransformerSupplier implements ValueTransformerWithKeySupplier<String, String, Integer> {
        private StatelessTransformerSupplier() {
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public ValueTransformerWithKey<String, String, Integer> m85get() {
            return new StatelessTransformer();
        }
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
            this.driver = null;
        }
    }

    @Before
    public void setUp() {
        this.capture = new MockProcessorSupplier<>();
        this.builder = new StreamsBuilder();
    }

    @Test
    public void shouldThrowOnGetIfSupplierReturnsNull() {
        try {
            new KTableTransformValues(this.parent, new NullSupplier(), QUERYABLE_NAME).get();
            Assert.fail("NPE expected");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void shouldThrowOnViewGetIfSupplierReturnsNull() {
        try {
            new KTableTransformValues(this.parent, new NullSupplier(), (String) null).view().get();
            Assert.fail("NPE expected");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
        NoOpValueTransformerWithKeySupplier noOpValueTransformerWithKeySupplier = new NoOpValueTransformerWithKeySupplier();
        new KTableTransformValues(this.parent, noOpValueTransformerWithKeySupplier, (String) null).get().init(this.context);
        MatcherAssert.assertThat(noOpValueTransformerWithKeySupplier.context, CoreMatchers.isA(ForwardingDisabledProcessorContext.class));
    }

    @Test
    public void shouldNotSendOldValuesByDefault() {
        Processor processor = new KTableTransformValues(this.parent, new ExclamationValueTransformerSupplier(new String[0]), (String) null).get();
        processor.init(this.context);
        this.context.forward("Key", new Change("Key->newValue!", (Object) null));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.context});
        processor.process("Key", new Change("newValue", "oldValue"));
        EasyMock.verify(new Object[]{this.context});
    }

    @Test
    public void shouldSendOldValuesIfConfigured() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, new ExclamationValueTransformerSupplier(new String[0]), (String) null);
        EasyMock.expect(Boolean.valueOf(this.parent.enableSendingOldValues(true))).andReturn(true);
        EasyMock.replay(new Object[]{this.parent});
        kTableTransformValues.enableSendingOldValues(true);
        Processor processor = kTableTransformValues.get();
        processor.init(this.context);
        this.context.forward("Key", new Change("Key->newValue!", "Key->oldValue!"));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.context});
        processor.process("Key", new Change("newValue", "oldValue"));
        EasyMock.verify(new Object[]{this.context});
    }

    @Test
    public void shouldNotSetSendOldValuesOnParentIfMaterialized() {
        EasyMock.expect(Boolean.valueOf(this.parent.enableSendingOldValues(EasyMock.anyBoolean()))).andThrow(new AssertionError("Should not call enableSendingOldValues")).anyTimes();
        EasyMock.replay(new Object[]{this.parent});
        new KTableTransformValues(this.parent, new NoOpValueTransformerWithKeySupplier(), QUERYABLE_NAME).enableSendingOldValues(true);
        EasyMock.verify(new Object[]{this.parent});
    }

    @Test
    public void shouldSetSendOldValuesOnParentIfNotMaterialized() {
        EasyMock.expect(Boolean.valueOf(this.parent.enableSendingOldValues(true))).andReturn(true);
        EasyMock.replay(new Object[]{this.parent});
        new KTableTransformValues(this.parent, new NoOpValueTransformerWithKeySupplier(), (String) null).enableSendingOldValues(true);
        EasyMock.verify(new Object[]{this.parent});
    }

    @Test
    public void shouldTransformOnGetIfNotMaterialized() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, new ExclamationValueTransformerSupplier(new String[0]), (String) null);
        EasyMock.expect(this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        EasyMock.expect(this.parentGetterSupplier.get()).andReturn(this.parentGetter);
        EasyMock.expect(this.parentGetter.get("Key")).andReturn(ValueAndTimestamp.make("Value", 73L));
        ProcessorRecordContext processorRecordContext = new ProcessorRecordContext(42L, 23L, -1, "foo", new RecordHeaders());
        EasyMock.expect(this.context.recordContext()).andReturn(processorRecordContext);
        this.context.setRecordContext(new ProcessorRecordContext(73L, -1L, -1, (String) null, new RecordHeaders()));
        EasyMock.expectLastCall();
        this.context.setRecordContext(processorRecordContext);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.parent, this.parentGetterSupplier, this.parentGetter, this.context});
        KTableValueGetter kTableValueGetter = kTableTransformValues.view().get();
        kTableValueGetter.init(this.context);
        MatcherAssert.assertThat((String) kTableValueGetter.get("Key").value(), CoreMatchers.is("Key->Value!"));
        EasyMock.verify(new Object[]{this.context});
    }

    @Test
    public void shouldGetFromStateStoreIfMaterialized() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, new ExclamationValueTransformerSupplier(new String[0]), QUERYABLE_NAME);
        EasyMock.expect(this.context.getStateStore(QUERYABLE_NAME)).andReturn(this.stateStore);
        EasyMock.expect(this.stateStore.get("Key")).andReturn(ValueAndTimestamp.make("something", 0L));
        EasyMock.replay(new Object[]{this.context, this.stateStore});
        KTableValueGetter kTableValueGetter = kTableTransformValues.view().get();
        kTableValueGetter.init(this.context);
        MatcherAssert.assertThat((String) kTableValueGetter.get("Key").value(), CoreMatchers.is("something"));
    }

    @Test
    public void shouldGetStoreNamesFromParentIfNotMaterialized() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, new ExclamationValueTransformerSupplier(new String[0]), (String) null);
        EasyMock.expect(this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        EasyMock.expect(this.parentGetterSupplier.storeNames()).andReturn(new String[]{"store1", "store2"});
        EasyMock.replay(new Object[]{this.parent, this.parentGetterSupplier});
        MatcherAssert.assertThat(kTableTransformValues.view().storeNames(), CoreMatchers.is(new String[]{"store1", "store2"}));
    }

    @Test
    public void shouldGetQueryableStoreNameIfMaterialized() {
        MatcherAssert.assertThat(new KTableTransformValues(this.parent, new ExclamationValueTransformerSupplier(new String[0]), QUERYABLE_NAME).view().storeNames(), CoreMatchers.is(new String[]{QUERYABLE_NAME}));
    }

    @Test
    public void shouldCloseTransformerOnProcessorClose() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, this.mockSupplier, (String) null);
        EasyMock.expect(this.mockSupplier.get()).andReturn(this.transformer);
        this.transformer.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.mockSupplier, this.transformer});
        kTableTransformValues.get().close();
        EasyMock.verify(new Object[]{this.transformer});
    }

    @Test
    public void shouldCloseTransformerOnGetterClose() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, this.mockSupplier, (String) null);
        EasyMock.expect(this.mockSupplier.get()).andReturn(this.transformer);
        EasyMock.expect(this.parentGetterSupplier.get()).andReturn(this.parentGetter);
        EasyMock.expect(this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        this.transformer.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.mockSupplier, this.transformer, this.parent, this.parentGetterSupplier});
        kTableTransformValues.view().get().close();
        EasyMock.verify(new Object[]{this.transformer});
    }

    @Test
    public void shouldCloseParentGetterClose() {
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this.parent, this.mockSupplier, (String) null);
        EasyMock.expect(this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        EasyMock.expect(this.mockSupplier.get()).andReturn(this.transformer);
        EasyMock.expect(this.parentGetterSupplier.get()).andReturn(this.parentGetter);
        this.parentGetter.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.mockSupplier, this.parent, this.parentGetterSupplier, this.parentGetter});
        kTableTransformValues.view().get().close();
        EasyMock.verify(new Object[]{this.parentGetter});
    }

    @Test
    public void shouldTransformValuesWithKey() {
        this.builder.addStateStore(storeBuilder(STORE_NAME)).addStateStore(storeBuilder(OTHER_STORE_NAME)).table(INPUT_TOPIC, CONSUMED).transformValues(new ExclamationValueTransformerSupplier(STORE_NAME, OTHER_STORE_NAME), new String[]{STORE_NAME, OTHER_STORE_NAME}).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), props());
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("A", "a", 5L);
        createInputTopic.pipeInput("B", "b", 10L);
        createInputTopic.pipeInput("D", (Object) null, 15L);
        MatcherAssert.assertThat(output(), CoreMatchers.hasItems(new KeyValueTimestamp[]{new KeyValueTimestamp("A", "A->a!", 5L), new KeyValueTimestamp("B", "B->b!", 10L), new KeyValueTimestamp("D", "D->null!", 15L)}));
        Assert.assertNull("Store should not be materialized", this.driver.getKeyValueStore(QUERYABLE_NAME));
    }

    @Test
    public void shouldTransformValuesWithKeyAndMaterialize() {
        this.builder.addStateStore(storeBuilder(STORE_NAME)).table(INPUT_TOPIC, CONSUMED).transformValues(new ExclamationValueTransformerSupplier(STORE_NAME, QUERYABLE_NAME), Materialized.as(QUERYABLE_NAME).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()), new String[]{STORE_NAME}).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), props());
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("A", "a", 5L);
        createInputTopic.pipeInput("B", "b", 10L);
        createInputTopic.pipeInput("C", (Object) null, 15L);
        MatcherAssert.assertThat(output(), CoreMatchers.hasItems(new KeyValueTimestamp[]{new KeyValueTimestamp("A", "A->a!", 5L), new KeyValueTimestamp("B", "B->b!", 10L), new KeyValueTimestamp("C", "C->null!", 15L)}));
        KeyValueStore keyValueStore = this.driver.getKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat(keyValueStore.get("A"), CoreMatchers.is("A->a!"));
        MatcherAssert.assertThat(keyValueStore.get("B"), CoreMatchers.is("B->b!"));
        MatcherAssert.assertThat(keyValueStore.get("C"), CoreMatchers.is("C->null!"));
        KeyValueStore timestampedKeyValueStore = this.driver.getTimestampedKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat(timestampedKeyValueStore.get("A"), CoreMatchers.is(ValueAndTimestamp.make("A->a!", 5L)));
        MatcherAssert.assertThat(timestampedKeyValueStore.get("B"), CoreMatchers.is(ValueAndTimestamp.make("B->b!", 10L)));
        MatcherAssert.assertThat(timestampedKeyValueStore.get("C"), CoreMatchers.is(ValueAndTimestamp.make("C->null!", 15L)));
    }

    @Test
    public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
        this.builder.table(INPUT_TOPIC, CONSUMED).transformValues(new StatefulTransformerSupplier(), Materialized.as(QUERYABLE_NAME).withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()), new String[0]).groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer())).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR).mapValues(mapBackToStrings()).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), props());
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("A", "ignored", 5L);
        createInputTopic.pipeInput("A", "ignored1", 15L);
        createInputTopic.pipeInput("A", "ignored2", 10L);
        MatcherAssert.assertThat(output(), CoreMatchers.hasItems(new KeyValueTimestamp[]{new KeyValueTimestamp("A", "1", 5L), new KeyValueTimestamp("A", "0", 15L), new KeyValueTimestamp("A", "2", 15L), new KeyValueTimestamp("A", "0", 15L), new KeyValueTimestamp("A", "3", 15L)}));
        MatcherAssert.assertThat(this.driver.getKeyValueStore(QUERYABLE_NAME).get("A"), CoreMatchers.is(3));
    }

    @Test
    public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
        this.builder.table(INPUT_TOPIC, CONSUMED).transformValues(new StatelessTransformerSupplier(), new String[0]).groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer())).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR).mapValues(mapBackToStrings()).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), props());
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("A", "a", 5L);
        createInputTopic.pipeInput("A", "aa", 15L);
        createInputTopic.pipeInput("A", "aaa", 10L);
        MatcherAssert.assertThat(output(), CoreMatchers.hasItems(new KeyValueTimestamp[]{new KeyValueTimestamp("A", "1", 5L), new KeyValueTimestamp("A", "0", 15L), new KeyValueTimestamp("A", "2", 15L), new KeyValueTimestamp("A", "0", 15L), new KeyValueTimestamp("A", "3", 15L)}));
    }

    private ArrayList<KeyValueTimestamp<String, String>> output() {
        return this.capture.capturedProcessors(1).get(0).processed();
    }

    private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> toForceSendingOfOldValues() {
        return (v1, v2) -> {
            return new KeyValue(v1, v2);
        };
    }

    private static ValueMapper<Integer, String> mapBackToStrings() {
        return (v0) -> {
            return v0.toString();
        };
    }

    private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(String str) {
        return Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(str), Serdes.Long(), Serdes.Long());
    }

    public static Properties props() {
        Properties properties = new Properties();
        properties.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        properties.setProperty("default.key.serde", Serdes.Integer().getClass().getName());
        properties.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void throwIfStoresNotAvailable(ProcessorContext processorContext, List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            if (processorContext.getStateStore(str) == null) {
                arrayList.add(str);
            }
        }
        if (!arrayList.isEmpty()) {
            throw new AssertionError("State stores are not accessible: " + arrayList);
        }
    }
}
