package org.apache.kafka.streams.examples.docs;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.class */
public class DeveloperGuideTesting {
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Long> inputTopic;
    private TestOutputTopic<String, Long> outputTopic;
    private KeyValueStore<String, Long> store;
    private Serde<String> stringSerde = new Serdes.StringSerde();
    private Serde<Long> longSerde = new Serdes.LongSerde();

    /* loaded from: input_file:org/apache/kafka/streams/examples/docs/DeveloperGuideTesting$CustomMaxAggregator.class */
    public class CustomMaxAggregator implements Processor<String, Long> {
        ProcessorContext context;
        private KeyValueStore<String, Long> store;

        public CustomMaxAggregator() {
        }

        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
            processorContext.schedule(Duration.ofSeconds(60L), PunctuationType.WALL_CLOCK_TIME, j -> {
                flushStore();
            });
            processorContext.schedule(Duration.ofSeconds(10L), PunctuationType.STREAM_TIME, j2 -> {
                flushStore();
            });
            this.store = processorContext.getStateStore("aggStore");
        }

        public void process(String str, Long l) {
            Long l2 = (Long) this.store.get(str);
            if (l2 == null || l.longValue() > l2.longValue()) {
                this.store.put(str, l);
            }
        }

        private void flushStore() {
            KeyValueIterator all = this.store.all();
            while (all.hasNext()) {
                KeyValue keyValue = (KeyValue) all.next();
                this.context.forward(keyValue.key, keyValue.value);
            }
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/examples/docs/DeveloperGuideTesting$CustomMaxAggregatorSupplier.class */
    public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
        public CustomMaxAggregatorSupplier() {
        }

        public Processor<String, Long> get() {
            return new CustomMaxAggregator();
        }
    }

    @Before
    public void setup() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("aggStore"), Serdes.String(), Serdes.Long()).withLoggingDisabled(), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        Properties properties = new Properties();
        properties.setProperty("application.id", "maxAggregation");
        properties.setProperty("bootstrap.servers", "dummy:1234");
        properties.setProperty("default.key.serde", Serdes.String().getClass().getName());
        properties.setProperty("default.value.serde", Serdes.Long().getClass().getName());
        this.testDriver = new TopologyTestDriver(topology, properties);
        this.inputTopic = this.testDriver.createInputTopic("input-topic", this.stringSerde.serializer(), this.longSerde.serializer());
        this.outputTopic = this.testDriver.createOutputTopic("result-topic", this.stringSerde.deserializer(), this.longSerde.deserializer());
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put("a", 21L);
    }

    @After
    public void tearDown() {
        this.testDriver.close();
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        this.inputTopic.pipeInput("a", 1L);
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 21L)));
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        this.inputTopic.pipeInput("a", 1L);
        MatcherAssert.assertThat(this.store.get("a"), CoreMatchers.equalTo(21L));
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 21L)));
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        this.inputTopic.pipeInput("a", 42L);
        MatcherAssert.assertThat(this.store.get("a"), CoreMatchers.equalTo(42L));
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 42L)));
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        this.inputTopic.pipeInput("b", 21L);
        MatcherAssert.assertThat(this.store.get("b"), CoreMatchers.equalTo(21L));
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 21L)));
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("b", 21L)));
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        Instant now = Instant.now();
        this.inputTopic.pipeInput("a", 1L, now);
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 21L)));
        this.inputTopic.pipeInput("a", 1L, now);
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
        this.inputTopic.pipeInput("a", 1L, now.plusSeconds(10L));
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 21L)));
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        this.testDriver.advanceWallClockTime(Duration.ofSeconds(60L));
        MatcherAssert.assertThat(this.outputTopic.readKeyValue(), CoreMatchers.equalTo(new KeyValue("a", 21L)));
        MatcherAssert.assertThat(Boolean.valueOf(this.outputTopic.isEmpty()), Is.is(true));
    }
}
