/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.examples.docs;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DeveloperGuideTesting {
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Long> inputTopic;
    private TestOutputTopic<String, Long> outputTopic;
    private KeyValueStore<String, Long> store;
    private final Serde<String> stringSerde = new Serdes.StringSerde();
    private final Serde<Long> longSerde = new Serdes.LongSerde();

    @BeforeEach
    public void setup() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"aggStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()).withLoggingDisabled(), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        Properties props = new Properties();
        props.setProperty("default.key.serde", Serdes.StringSerde.class.getName());
        props.setProperty("default.value.serde", Serdes.LongSerde.class.getName());
        this.testDriver = new TopologyTestDriver(topology, props);
        this.inputTopic = this.testDriver.createInputTopic("input-topic", this.stringSerde.serializer(), this.longSerde.serializer());
        this.outputTopic = this.testDriver.createOutputTopic("result-topic", this.stringSerde.deserializer(), this.longSerde.deserializer());
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    @AfterEach
    public void tearDown() {
        this.testDriver.close();
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        this.inputTopic.pipeInput((Object)"a", (Object)1L);
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        this.inputTopic.pipeInput((Object)"a", (Object)1L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)21L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        this.inputTopic.pipeInput((Object)"a", (Object)42L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)42L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)42L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        this.inputTopic.pipeInput((Object)"b", (Object)21L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"b"), (Matcher)CoreMatchers.equalTo((Object)21L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"b", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        Instant recordTime = Instant.now();
        this.inputTopic.pipeInput((Object)"a", (Object)1L, recordTime);
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        this.inputTopic.pipeInput((Object)"a", (Object)1L, recordTime);
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
        this.inputTopic.pipeInput((Object)"a", (Object)1L, recordTime.plusSeconds(10L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        this.testDriver.advanceWallClockTime(Duration.ofSeconds(60L));
        MatcherAssert.assertThat((Object)this.outputTopic.readKeyValue(), (Matcher)CoreMatchers.equalTo((Object)new KeyValue((Object)"a", (Object)21L)));
        MatcherAssert.assertThat((Object)this.outputTopic.isEmpty(), (Matcher)Is.is((Object)true));
    }

    public static class CustomMaxAggregator
    implements Processor<String, Long, String, Long> {
        ProcessorContext<String, Long> context;
        private KeyValueStore<String, Long> store;

        public void init(ProcessorContext<String, Long> context) {
            this.context = context;
            context.schedule(Duration.ofSeconds(60L), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
            context.schedule(Duration.ofSeconds(10L), PunctuationType.STREAM_TIME, this::flushStore);
            this.store = (KeyValueStore)context.getStateStore("aggStore");
        }

        public void process(Record<String, Long> record) {
            Long oldValue = (Long)this.store.get(record.key());
            if (oldValue == null || (Long)record.value() > oldValue) {
                this.store.put(record.key(), record.value());
            }
        }

        private void flushStore(long timestamp) {
            try (KeyValueIterator it = this.store.all();){
                while (it.hasNext()) {
                    KeyValue next = (KeyValue)it.next();
                    this.context.forward(new Record(next.key, next.value, timestamp));
                }
            }
        }
    }

    public static class CustomMaxAggregatorSupplier
    implements ProcessorSupplier<String, Long, String, Long> {
        public Processor<String, Long, String, Long> get() {
            return new CustomMaxAggregator();
        }
    }
}

