/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.examples.docs;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class DeveloperGuideTesting {
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Long> inputTopic;
    private TestOutputTopic<String, Long> outputTopic;
    private KeyValueStore<String, Long> store;
    private Serde<String> stringSerde = new Serdes.StringSerde();
    private Serde<Long> longSerde = new Serdes.LongSerde();

    @Before
    public void setup() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"aggStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()).withLoggingDisabled(), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        Properties props = new Properties();
        props.setProperty("application.id", "maxAggregation");
        props.setProperty("bootstrap.servers", "dummy:1234");
        props.setProperty("default.key.serde", Serdes.String().getClass().getName());
        props.setProperty("default.value.serde", Serdes.Long().getClass().getName());
        this.testDriver = new TopologyTestDriver(topology, props);
        this.inputTopic = this.testDriver.createInputTopic("input-topic", this.stringSerde.serializer(), this.longSerde.serializer());
        this.outputTopic = this.testDriver.createOutputTopic("result-topic", this.stringSerde.deserializer(), this.longSerde.deserializer());
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    @After
    public void tearDown() {
        this.testDriver.close();
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        this.inputTopic.pipeInput((Object)"a", (Object)1L);
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        this.inputTopic.pipeInput((Object)"a", (Object)1L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)21L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        this.inputTopic.pipeInput((Object)"a", (Object)42L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)42L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)42L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        this.inputTopic.pipeInput((Object)"b", (Object)21L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"b"), (Matcher)CoreMatchers.equalTo((Object)21L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"b", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        Instant recordTime = Instant.now();
        this.inputTopic.pipeInput((Object)"a", (Object)1L, recordTime);
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        this.inputTopic.pipeInput((Object)"a", (Object)1L, recordTime);
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
        this.inputTopic.pipeInput((Object)"a", (Object)1L, recordTime.plusSeconds(10L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        this.testDriver.advanceWallClockTime(Duration.ofSeconds(60L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    public class CustomMaxAggregator
    implements Processor<String, Long> {
        ProcessorContext context;
        private KeyValueStore<String, Long> store;

        public void init(ProcessorContext context) {
            this.context = context;
            context.schedule(Duration.ofSeconds(60L), PunctuationType.WALL_CLOCK_TIME, time -> this.flushStore());
            context.schedule(Duration.ofSeconds(10L), PunctuationType.STREAM_TIME, time -> this.flushStore());
            this.store = (KeyValueStore)context.getStateStore("aggStore");
        }

        public void process(String key, Long value) {
            Long oldValue = (Long)this.store.get((Object)key);
            if (oldValue == null || value > oldValue) {
                this.store.put((Object)key, (Object)value);
            }
        }

        private void flushStore() {
            KeyValueIterator it = this.store.all();
            while (it.hasNext()) {
                KeyValue next = (KeyValue)it.next();
                this.context.forward(next.key, next.value);
            }
        }

        public void close() {
        }
    }

    public class CustomMaxAggregatorSupplier
    implements ProcessorSupplier<String, Long> {
        public Processor<String, Long> get() {
            return new CustomMaxAggregator();
        }
    }
}

