/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.suppress;

import java.time.Duration;
import java.util.Collection;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KTableSuppressProcessorTest {
    private static final long ARBITRARY_LONG = 5L;
    private static final Change<Long> ARBITRARY_CHANGE = new Change((Object)7L, (Object)14L);

    @Test
    public void zeroTimeLimitShouldImmediatelyEmit() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 5L;
        context.setRecordMetadata("", 0, 0L, null, 5L);
        String key = "hey";
        Change<Long> value = ARBITRARY_CHANGE;
        processor.process((Object)"hey", value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"hey", value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)5L));
    }

    @Test
    public void windowedZeroTimeLimitShouldImmediatelyEmit() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 5L;
        context.setRecordMetadata("", 0, 0L, null, 5L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)key, value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)5L));
    }

    @Test
    public void intermediateSuppressionShouldBufferAndEmitLater() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(1L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 0L;
        context.setRecordMetadata("topic", 0, 0L, null, 0L);
        String key = "hey";
        Change value = new Change(null, (Object)1L);
        processor.process((Object)"hey", value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        context.setRecordMetadata("topic", 0, 1L, null, 1L);
        processor.process((Object)"tick", new Change(null, null));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"hey", (Object)value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)0L));
    }

    @Test
    public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(1L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 1L), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long windowStart = 99L;
        long recordTime = 99L;
        long windowEnd = 100L;
        context.setRecordMetadata("topic", 0, 0L, null, 99L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(99L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        long windowStart2 = 100L;
        long recordTime2 = 100L;
        long windowEnd2 = 101L;
        context.setRecordMetadata("topic", 0, 1L, null, 100L);
        processor.process((Object)new Windowed((Object)"dummyKey1", (Window)new TimeWindow(100L, 101L)), ARBITRARY_CHANGE);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        long windowStart3 = 101L;
        long recordTime3 = 101L;
        long windowEnd3 = 102L;
        context.setRecordMetadata("topic", 0, 1L, null, 101L);
        processor.process((Object)new Windowed((Object)"dummyKey2", (Window)new TimeWindow(101L, 102L)), ARBITRARY_CHANGE);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)key, value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)99L));
    }

    @Test
    public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 5L;
        long windowEnd = 100L;
        context.setRecordMetadata("", 0, 0L, null, 5L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        context.setRecordMetadata("", 0, 1L, null, 100L);
        processor.process((Object)new Windowed((Object)"dummyKey", (Window)new TimeWindow(100L, 200L)), ARBITRARY_CHANGE);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)key, value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)5L));
    }

    @Test
    public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)key, value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)100L));
    }

    @Test
    public void finalResultsShouldDropTombstonesForTimeWindows() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change value = new Change(null, (Object)5L);
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
    }

    @Test
    public void finalResultsShouldDropTombstonesForSessionWindows() {
        Harness harness = new Harness(KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        Windowed key = new Windowed((Object)"hey", (Window)new SessionWindow(0L, 0L));
        Change value = new Change(null, (Object)5L);
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
    }

    @Test
    public void suppressShouldNotDropTombstonesForTimeWindows() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(0L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change value = new Change(null, (Object)5L);
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)key, (Object)value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)100L));
    }

    @Test
    public void suppressShouldNotDropTombstonesForSessionWindows() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(0L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)0L)), WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        Windowed key = new Windowed((Object)"hey", (Window)new SessionWindow(0L, 0L));
        Change value = new Change(null, (Object)5L);
        processor.process((Object)key, value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)key, (Object)value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)100L));
    }

    @Test
    public void suppressShouldNotDropTombstonesForKTable() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(0L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)0L)), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        processor.process((Object)"hey", value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"hey", (Object)value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)100L));
    }

    @Test
    public void suppressShouldEmitWhenOverRecordCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L)), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        processor.process((Object)"hey", value);
        context.setRecordMetadata("", 0, 1L, null, 101L);
        processor.process((Object)"dummyKey", value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"hey", (Object)value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)100L));
    }

    @Test
    public void suppressShouldEmitWhenOverByteCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)60L)), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        processor.process((Object)"hey", value);
        context.setRecordMetadata("", 0, 1L, null, 101L);
        processor.process((Object)"dummyKey", value);
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"hey", (Object)value)));
        MatcherAssert.assertThat((Object)capturedForward.timestamp(), (Matcher)CoreMatchers.is((Object)100L));
    }

    @Test
    public void suppressShouldShutDownWhenOverRecordCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).shutDownWhenFull()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        context.setCurrentNode(new ProcessorNode("testNode"));
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        processor.process((Object)"hey", value);
        context.setRecordMetadata("", 0, 1L, null, 100L);
        try {
            processor.process((Object)"dummyKey", value);
            Assert.fail((String)"expected an exception");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"buffer exceeded its max capacity"));
        }
    }

    @Test
    public void suppressShouldShutDownWhenOverByteCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)60L).shutDownWhenFull()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext context = harness.context;
        KTableSuppressProcessor processor = harness.processor;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L, null, 100L);
        context.setCurrentNode(new ProcessorNode("testNode"));
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        processor.process((Object)"hey", value);
        context.setRecordMetadata("", 0, 1L, null, 100L);
        try {
            processor.process((Object)"dummyKey", value);
            Assert.fail((String)"expected an exception");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"buffer exceeded its max capacity"));
        }
    }

    private static <K extends Windowed> SuppressedInternal<K> finalResults(Duration grace) {
        return ((FinalResultsSuppressionBuilder)Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).buildFinalResultsSuppression(grace);
    }

    private static <E> Matcher<Collection<E>> hasSize(final int i) {
        return new BaseMatcher<Collection<E>>(){

            public void describeTo(Description description) {
                description.appendText("a collection of size " + i);
            }

            public boolean matches(Object item) {
                if (item == null) {
                    return false;
                }
                return ((Collection)item).size() == i;
            }
        };
    }

    private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(Class<K> rawType, long windowSize) {
        Serde kSerde = Serdes.serdeFrom(rawType);
        return new Serdes.WrapperSerde((Serializer)new TimeWindowedSerializer(kSerde.serializer()), (Deserializer)new TimeWindowedDeserializer(kSerde.deserializer(), windowSize));
    }

    private static class Harness<K, V> {
        private final KTableSuppressProcessor<K, V> processor;
        private final MockInternalProcessorContext context;

        Harness(Suppressed<K> suppressed, Serde<K> keySerde, Serde<V> valueSerde) {
            String storeName = "test-store";
            StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder("test-store", keySerde, (Serde)FullChangeSerde.castOrWrap(valueSerde)).withLoggingDisabled().build();
            KTableSuppressProcessor processor = new KTableSuppressProcessor((SuppressedInternal)suppressed, "test-store");
            MockInternalProcessorContext context = new MockInternalProcessorContext();
            buffer.init((ProcessorContext)context, buffer);
            processor.init((ProcessorContext)context);
            this.processor = processor;
            this.context = context;
        }
    }
}

