package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.class */
public class KStreamNewProcessorApiTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest$TransformerSupplier.class */
    public static class TransformerSupplier implements FixedKeyProcessorSupplier<String, String, String> {
        private final StoreBuilder<?> storeBuilder;

        public TransformerSupplier(StoreBuilder<?> storeBuilder) {
            this.storeBuilder = storeBuilder;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public ContextualFixedKeyProcessor<String, String, String> m103get() {
            return new ContextualFixedKeyProcessor<String, String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamNewProcessorApiTest.TransformerSupplier.1
                KeyValueStore<String, String> store;
                FixedKeyProcessorContext<String, String> context;

                public void init(FixedKeyProcessorContext<String, String> fixedKeyProcessorContext) {
                    super.init(fixedKeyProcessorContext);
                    this.store = fixedKeyProcessorContext.getStateStore("store");
                    Objects.requireNonNull(this.store, "State store can't be null");
                    this.context = fixedKeyProcessorContext;
                }

                public void process(FixedKeyRecord<String, String> fixedKeyRecord) {
                    this.store.putIfAbsent(fixedKeyRecord.key(), ((String) fixedKeyRecord.value()) + "Updated");
                    context().forward(fixedKeyRecord.withValue(((String) fixedKeyRecord.value()) + "Updated"));
                }
            };
        }

        public Set<StoreBuilder<?>> stores() {
            if (this.storeBuilder != null) {
                return Collections.singleton(this.storeBuilder);
            }
            return null;
        }
    }

    @DisplayName("Should attach the state store using ConnectedStoreProvider")
    @Test
    void shouldGetStateStoreWithConnectedStoreProvider() {
        runTest(false);
    }

    @DisplayName("Should attach the state store StreamBuilder.addStateStore")
    @Test
    void shouldGetStateStoreWithStreamBuilder() {
        runTest(true);
    }

    private void runTest(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String());
        if (z) {
            streamsBuilder.addStateStore(keyValueStoreBuilder);
        }
        streamsBuilder.stream("input", Consumed.with(Serdes.String(), Serdes.String())).processValues(new TransformerSupplier(z ? null : keyValueStoreBuilder), new String[]{"store"}).to("output", Produced.with(Serdes.String(), Serdes.String()));
        List asList = Arrays.asList(KeyValue.pair("a", "foo"), KeyValue.pair("b", "bar"), KeyValue.pair("c", "baz"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
                asList.forEach(keyValue -> {
                    createInputTopic.pipeInput(keyValue.key, keyValue.value);
                });
                List asList2 = Arrays.asList("fooUpdated", "barUpdated", "bazUpdated");
                ArrayList arrayList = new ArrayList(topologyTestDriver.createOutputTopic("output", Serdes.String().deserializer(), Serdes.String().deserializer()).readValuesToList());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                Assertions.assertEquals(asList2, arrayList);
                Assertions.assertEquals(keyValueStore.get("a"), "fooUpdated");
                Assertions.assertEquals(keyValueStore.get("b"), "barUpdated");
                Assertions.assertEquals(keyValueStore.get("c"), "bazUpdated");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
