package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.class */
public class KStreamTransformIntegrationTest {
    private StreamsBuilder builder;
    private final String topic = "stream";
    private final String stateStoreName = "myTransformState";
    private final List<KeyValue<Integer, Integer>> results = new ArrayList();
    private final ForeachAction<Integer, Integer> accumulateExpected = (num, num2) -> {
        this.results.add(KeyValue.pair(num, num2));
    };
    private KStream<Integer, Integer> stream;

    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest$TestFlatTransformer.class */
    private class TestFlatTransformer implements Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> {
        private KeyValueStore<Integer, Integer> state;

        private TestFlatTransformer() {
        }

        public void init(ProcessorContext processorContext) {
            this.state = processorContext.getStateStore("myTransformState");
        }

        public Iterable<KeyValue<Integer, Integer>> transform(Integer num, Integer num2) {
            ArrayList arrayList = new ArrayList();
            this.state.putIfAbsent(num, 0);
            Integer num3 = (Integer) this.state.get(num);
            for (int i = 0; i < 3; i++) {
                Integer valueOf = Integer.valueOf(num.intValue() + i);
                int intValue = num2.intValue();
                Integer num4 = num3;
                num3 = Integer.valueOf(num3.intValue() + 1);
                arrayList.add(new KeyValue(valueOf, Integer.valueOf(intValue + num4.intValue())));
            }
            this.state.put(num, num3);
            return arrayList;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest$TestFlatValueTransformer.class */
    private class TestFlatValueTransformer implements ValueTransformer<Integer, Iterable<Integer>> {
        private KeyValueStore<Integer, Integer> state;

        private TestFlatValueTransformer() {
        }

        public void init(ProcessorContext processorContext) {
            this.state = processorContext.getStateStore("myTransformState");
        }

        public Iterable<Integer> transform(Integer num) {
            ArrayList arrayList = new ArrayList();
            this.state.putIfAbsent(num, 0);
            Integer num2 = (Integer) this.state.get(num);
            for (int i = 0; i < 3; i++) {
                Integer valueOf = Integer.valueOf(num2.intValue() + 1);
                num2 = valueOf;
                arrayList.add(valueOf);
            }
            this.state.put(num, num2);
            return arrayList;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest$TestTransformer.class */
    private class TestTransformer implements Transformer<Integer, Integer, KeyValue<Integer, Integer>> {
        private KeyValueStore<Integer, Integer> state;

        private TestTransformer() {
        }

        public void init(ProcessorContext processorContext) {
            this.state = processorContext.getStateStore("myTransformState");
        }

        public KeyValue<Integer, Integer> transform(Integer num, Integer num2) {
            this.state.putIfAbsent(num, 0);
            Integer num3 = (Integer) this.state.get(num);
            Integer valueOf = Integer.valueOf(num.intValue() + 1);
            int intValue = num2.intValue();
            Integer valueOf2 = Integer.valueOf(num3.intValue() + 1);
            KeyValue<Integer, Integer> keyValue = new KeyValue<>(valueOf, Integer.valueOf(intValue + num3.intValue()));
            this.state.put(num, valueOf2);
            return keyValue;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest$TestValueTransformer.class */
    private class TestValueTransformer implements ValueTransformer<Integer, Integer> {
        private KeyValueStore<Integer, Integer> state;

        private TestValueTransformer() {
        }

        public void init(ProcessorContext processorContext) {
            this.state = processorContext.getStateStore("myTransformState");
        }

        public Integer transform(Integer num) {
            this.state.putIfAbsent(num, 0);
            Integer num2 = (Integer) this.state.get(num);
            KeyValueStore<Integer, Integer> keyValueStore = this.state;
            Integer valueOf = Integer.valueOf(num2.intValue() + 1);
            keyValueStore.put(num, valueOf);
            return valueOf;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest$TestValueTransformerWithKey.class */
    private class TestValueTransformerWithKey implements ValueTransformerWithKey<Integer, Integer, Integer> {
        private KeyValueStore<Integer, Integer> state;

        private TestValueTransformerWithKey() {
        }

        public void init(ProcessorContext processorContext) {
            this.state = processorContext.getStateStore("myTransformState");
        }

        public Integer transform(Integer num, Integer num2) {
            this.state.putIfAbsent(num, 0);
            Integer num3 = (Integer) this.state.get(num);
            int intValue = num2.intValue();
            Integer valueOf = Integer.valueOf(num3.intValue() + 1);
            Integer valueOf2 = Integer.valueOf(intValue + num3.intValue());
            this.state.put(num, valueOf);
            return valueOf2;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest$TestValueTransformerWithoutKey.class */
    private class TestValueTransformerWithoutKey implements ValueTransformerWithKey<Integer, Integer, Iterable<Integer>> {
        private KeyValueStore<Integer, Integer> state;

        private TestValueTransformerWithoutKey() {
        }

        public void init(ProcessorContext processorContext) {
            this.state = processorContext.getStateStore("myTransformState");
        }

        public Iterable<Integer> transform(Integer num, Integer num2) {
            ArrayList arrayList = new ArrayList();
            this.state.putIfAbsent(num, 0);
            Integer num3 = (Integer) this.state.get(num);
            for (int i = 0; i < 3; i++) {
                int intValue = num2.intValue();
                Integer num4 = num3;
                num3 = Integer.valueOf(num3.intValue() + 1);
                arrayList.add(Integer.valueOf(intValue + num4.intValue()));
            }
            this.state.put(num, num3);
            return arrayList;
        }

        public void close() {
        }
    }

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.stream = this.builder.stream("stream", Consumed.with(Serdes.Integer(), Serdes.Integer()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StoreBuilder<KeyValueStore<Integer, Integer>> storeBuilder() {
        return Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.Integer(), Serdes.Integer());
    }

    private void verifyResult(List<KeyValue<Integer, Integer>> list) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer()));
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("stream", new IntegerSerializer(), new IntegerSerializer()).pipeKeyValueList(Arrays.asList(new KeyValue(1, 1), new KeyValue(2, 2), new KeyValue(3, 3), new KeyValue(2, 1), new KeyValue(2, 3), new KeyValue(1, 3)));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(this.results, IsEqual.equalTo(list));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldTransform() {
        this.builder.addStateStore(storeBuilder());
        this.stream.transform(() -> {
            return new TestTransformer();
        }, new String[]{"myTransformState"}).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(2, 1), KeyValue.pair(3, 2), KeyValue.pair(4, 3), KeyValue.pair(3, 2), KeyValue.pair(3, 5), KeyValue.pair(2, 4)));
    }

    @Test
    public void shouldTransformWithConnectedStoreProvider() {
        this.stream.transform(new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.1
            public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
                return new TestTransformer();
            }

            public Set<StoreBuilder<?>> stores() {
                return Collections.singleton(KStreamTransformIntegrationTest.this.storeBuilder());
            }
        }, new String[0]).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(2, 1), KeyValue.pair(3, 2), KeyValue.pair(4, 3), KeyValue.pair(3, 2), KeyValue.pair(3, 5), KeyValue.pair(2, 4)));
    }

    @Test
    public void shouldFlatTransform() {
        this.builder.addStateStore(storeBuilder());
        this.stream.flatTransform(() -> {
            return new TestFlatTransformer();
        }, new String[]{"myTransformState"}).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(5, 5), KeyValue.pair(2, 4), KeyValue.pair(3, 5), KeyValue.pair(4, 6), KeyValue.pair(2, 9), KeyValue.pair(3, 10), KeyValue.pair(4, 11), KeyValue.pair(1, 6), KeyValue.pair(2, 7), KeyValue.pair(3, 8)));
    }

    @Test
    public void shouldFlatTransformWithConnectedStoreProvider() {
        this.stream.flatTransform(new TransformerSupplier<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.2
            public Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> get() {
                return new TestFlatTransformer();
            }

            public Set<StoreBuilder<?>> stores() {
                return Collections.singleton(KStreamTransformIntegrationTest.this.storeBuilder());
            }
        }, new String[0]).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(5, 5), KeyValue.pair(2, 4), KeyValue.pair(3, 5), KeyValue.pair(4, 6), KeyValue.pair(2, 9), KeyValue.pair(3, 10), KeyValue.pair(4, 11), KeyValue.pair(1, 6), KeyValue.pair(2, 7), KeyValue.pair(3, 8)));
    }

    @Test
    public void shouldTransformValuesWithValueTransformerWithKey() {
        this.builder.addStateStore(storeBuilder());
        this.stream.transformValues(() -> {
            return new TestValueTransformerWithKey();
        }, new String[]{"myTransformState"}).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(2, 2), KeyValue.pair(2, 5), KeyValue.pair(1, 4)));
    }

    @Test
    public void shouldTransformValuesWithValueTransformerWithKeyWithConnectedStoreProvider() {
        this.stream.transformValues(new ValueTransformerWithKeySupplier<Integer, Integer, Integer>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.3
            public ValueTransformerWithKey<Integer, Integer, Integer> get() {
                return new TestValueTransformerWithKey();
            }

            public Set<StoreBuilder<?>> stores() {
                return Collections.singleton(KStreamTransformIntegrationTest.this.storeBuilder());
            }
        }, new String[0]).foreach(this.accumulateExpected);
    }

    @Test
    public void shouldTransformValuesWithValueTransformerWithoutKey() {
        this.builder.addStateStore(storeBuilder());
        this.stream.transformValues(() -> {
            return new TestValueTransformer();
        }, new String[]{"myTransformState"}).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 1), KeyValue.pair(3, 1), KeyValue.pair(2, 2), KeyValue.pair(2, 2), KeyValue.pair(1, 3)));
    }

    @Test
    public void shouldTransformValuesWithValueTransformerWithoutKeyWithConnectedStoreProvider() {
        this.stream.transformValues(new ValueTransformerSupplier<Integer, Integer>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.4
            public ValueTransformer<Integer, Integer> get() {
                return new TestValueTransformer();
            }

            public Set<StoreBuilder<?>> stores() {
                return Collections.singleton(KStreamTransformIntegrationTest.this.storeBuilder());
            }
        }, new String[0]).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 1), KeyValue.pair(3, 1), KeyValue.pair(2, 2), KeyValue.pair(2, 2), KeyValue.pair(1, 3)));
    }

    @Test
    public void shouldFlatTransformValuesWithKey() {
        this.builder.addStateStore(storeBuilder());
        this.stream.flatTransformValues(() -> {
            return new TestValueTransformerWithoutKey();
        }, new String[]{"myTransformState"}).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(1, 2), KeyValue.pair(1, 3), KeyValue.pair(2, 2), KeyValue.pair(2, 3), KeyValue.pair(2, 4), KeyValue.pair(3, 3), KeyValue.pair(3, 4), KeyValue.pair(3, 5), KeyValue.pair(2, 4), KeyValue.pair(2, 5), KeyValue.pair(2, 6), KeyValue.pair(2, 9), KeyValue.pair(2, 10), KeyValue.pair(2, 11), KeyValue.pair(1, 6), KeyValue.pair(1, 7), KeyValue.pair(1, 8)));
    }

    @Test
    public void shouldFlatTransformValuesWithKeyWithConnectedStoreProvider() {
        this.stream.flatTransformValues(new ValueTransformerWithKeySupplier<Integer, Integer, Iterable<Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.5
            public ValueTransformerWithKey<Integer, Integer, Iterable<Integer>> get() {
                return new TestValueTransformerWithoutKey();
            }

            public Set<StoreBuilder<?>> stores() {
                return Collections.singleton(KStreamTransformIntegrationTest.this.storeBuilder());
            }
        }, new String[0]).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(1, 2), KeyValue.pair(1, 3), KeyValue.pair(2, 2), KeyValue.pair(2, 3), KeyValue.pair(2, 4), KeyValue.pair(3, 3), KeyValue.pair(3, 4), KeyValue.pair(3, 5), KeyValue.pair(2, 4), KeyValue.pair(2, 5), KeyValue.pair(2, 6), KeyValue.pair(2, 9), KeyValue.pair(2, 10), KeyValue.pair(2, 11), KeyValue.pair(1, 6), KeyValue.pair(1, 7), KeyValue.pair(1, 8)));
    }

    @Test
    public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
        this.builder.addStateStore(storeBuilder());
        this.stream.flatTransformValues(() -> {
            return new TestFlatValueTransformer();
        }, new String[]{"myTransformState"}).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(1, 2), KeyValue.pair(1, 3), KeyValue.pair(2, 1), KeyValue.pair(2, 2), KeyValue.pair(2, 3), KeyValue.pair(3, 1), KeyValue.pair(3, 2), KeyValue.pair(3, 3), KeyValue.pair(2, 4), KeyValue.pair(2, 5), KeyValue.pair(2, 6), KeyValue.pair(2, 4), KeyValue.pair(2, 5), KeyValue.pair(2, 6), KeyValue.pair(1, 7), KeyValue.pair(1, 8), KeyValue.pair(1, 9)));
    }

    @Test
    public void shouldFlatTransformValuesWithValueTransformerWithoutKeyWithConnectedStoreProvider() {
        this.stream.flatTransformValues(new ValueTransformerSupplier<Integer, Iterable<Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.6
            public ValueTransformer<Integer, Iterable<Integer>> get() {
                return new TestFlatValueTransformer();
            }

            public Set<StoreBuilder<?>> stores() {
                return Collections.singleton(KStreamTransformIntegrationTest.this.storeBuilder());
            }
        }, new String[0]).foreach(this.accumulateExpected);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(1, 2), KeyValue.pair(1, 3), KeyValue.pair(2, 1), KeyValue.pair(2, 2), KeyValue.pair(2, 3), KeyValue.pair(3, 1), KeyValue.pair(3, 2), KeyValue.pair(3, 3), KeyValue.pair(2, 4), KeyValue.pair(2, 5), KeyValue.pair(2, 6), KeyValue.pair(2, 4), KeyValue.pair(2, 5), KeyValue.pair(2, 6), KeyValue.pair(1, 7), KeyValue.pair(1, 8), KeyValue.pair(1, 9)));
    }
}
