package org.apache.kafka.streams.state.internals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.class */
public class ChangeLoggingKeyValueStoreTest {
    private MockProcessorContext context;
    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
    private final Serde<String> keySerde = Serdes.String();
    private final Serde<String> valueSerde = Serdes.String();
    private final ChangeLoggingKeyValueStore<String, String> store = new ChangeLoggingKeyValueStore<>(this.inner, this.keySerde, this.valueSerde);
    private final Map sent = new HashMap();
    private final String hi = "hi";
    private final Bytes hiBytes = Bytes.wrap("hi".getBytes());
    private final String there = "there";
    private final byte[] thereBytes = "there".getBytes();
    private final String hello = "hello";
    private final String world = "world";

    @Before
    public void before() {
        this.context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new NoOpRecordCollector() { // from class: org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStoreTest.1
            @Override // org.apache.kafka.test.NoOpRecordCollector
            public <K, V> void send(String str, K k, V v, Integer num, Long l, Serializer<K> serializer, Serializer<V> serializer2) {
                ChangeLoggingKeyValueStoreTest.this.sent.put(k, v);
            }
        }, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())));
        this.context.setTime(0L);
        this.store.init(this.context, this.store);
    }

    @After
    public void after() {
        this.context.close();
        this.store.close();
    }

    @Test
    public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
        this.store.put("hi", "there");
        MatcherAssert.assertThat(deserializedValueFromInner("hi"), CoreMatchers.equalTo("there"));
    }

    @Test
    public void shouldLogChangeOnPut() {
        this.store.put("hi", "there");
        MatcherAssert.assertThat((byte[]) this.sent.get(this.hiBytes), CoreMatchers.equalTo(this.thereBytes));
    }

    @Test
    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
        this.store.putAll(Arrays.asList(KeyValue.pair("hello", "world"), KeyValue.pair("hi", "there")));
        MatcherAssert.assertThat(deserializedValueFromInner("hello"), CoreMatchers.equalTo("world"));
        MatcherAssert.assertThat(deserializedValueFromInner("hi"), CoreMatchers.equalTo("there"));
    }

    @Test
    public void shouldLogChangesOnPutAll() {
        this.store.putAll(Arrays.asList(KeyValue.pair("hi", "there"), KeyValue.pair("hello", "world")));
        MatcherAssert.assertThat((byte[]) this.sent.get(this.hiBytes), CoreMatchers.equalTo(this.thereBytes));
        MatcherAssert.assertThat((byte[]) this.sent.get(Bytes.wrap("hello".getBytes())), CoreMatchers.equalTo("world".getBytes()));
    }

    @Test
    public void shouldPutNullOnDelete() {
        this.store.put("hi", "there");
        this.store.delete("hi");
        MatcherAssert.assertThat(this.inner.get(this.hiBytes), CoreMatchers.nullValue());
    }

    @Test
    public void shouldReturnOldValueOnDelete() {
        this.store.put("hi", "there");
        MatcherAssert.assertThat(this.store.delete("hi"), CoreMatchers.equalTo("there"));
    }

    @Test
    public void shouldReturnNullOnDeleteIfNoOldValue() {
        MatcherAssert.assertThat(this.store.delete("hi"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldLogKeyNullOnDelete() {
        this.store.put("hi", "there");
        this.store.delete("hi");
        MatcherAssert.assertThat(this.sent.get("hi"), CoreMatchers.nullValue());
    }

    @Test
    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
        this.store.putIfAbsent("hi", "there");
        MatcherAssert.assertThat(this.inner.get(this.hiBytes), CoreMatchers.equalTo(this.thereBytes));
    }

    @Test
    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
        this.store.put("hi", "there");
        this.store.putIfAbsent("hi", "world");
        MatcherAssert.assertThat(this.inner.get(this.hiBytes), CoreMatchers.equalTo(this.thereBytes));
    }

    @Test
    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
        this.store.putIfAbsent("hi", "there");
        MatcherAssert.assertThat((byte[]) this.sent.get(this.hiBytes), CoreMatchers.equalTo(this.thereBytes));
    }

    @Test
    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
        this.store.put("hi", "there");
        this.store.putIfAbsent("hi", "world");
        MatcherAssert.assertThat((byte[]) this.sent.get(this.hiBytes), CoreMatchers.equalTo(this.thereBytes));
    }

    @Test
    public void shouldReturnCurrentValueOnPutIfAbsent() {
        this.store.put("hi", "there");
        MatcherAssert.assertThat(this.store.putIfAbsent("hi", "world"), CoreMatchers.equalTo("there"));
    }

    @Test
    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
        MatcherAssert.assertThat(this.store.putIfAbsent("hi", "there"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldQueryRange() {
        this.store.put("hello", "world");
        this.store.put("hi", "there");
        this.store.put("zooom", "home");
        KeyValueIterator range = this.store.range("hello", "hi");
        MatcherAssert.assertThat(range.next(), CoreMatchers.equalTo(KeyValue.pair("hello", "world")));
        MatcherAssert.assertThat(range.next(), CoreMatchers.equalTo(KeyValue.pair("hi", "there")));
        Assert.assertFalse(range.hasNext());
    }

    @Test
    public void shouldReturnAllKeyValues() {
        this.store.put("hello", "world");
        this.store.put("hi", "there");
        this.store.put("zooom", "home");
        KeyValueIterator all = this.store.all();
        MatcherAssert.assertThat(all.next(), CoreMatchers.equalTo(KeyValue.pair("hello", "world")));
        MatcherAssert.assertThat(all.next(), CoreMatchers.equalTo(KeyValue.pair("hi", "there")));
        MatcherAssert.assertThat(all.next(), CoreMatchers.equalTo(KeyValue.pair("zooom", "home")));
        Assert.assertFalse(all.hasNext());
    }

    @Test
    public void shouldReturnValueOnGetWhenExists() {
        this.store.put("hello", "world");
        MatcherAssert.assertThat(this.store.get("hello"), CoreMatchers.equalTo("world"));
    }

    @Test
    public void shouldReturnNullOnGetWhenDoesntExist() {
        MatcherAssert.assertThat(this.store.get("hello"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldReturnInnerStoreName() {
        MatcherAssert.assertThat(this.store.name(), CoreMatchers.equalTo("kv"));
    }

    private String deserializedValueFromInner(String str) {
        return (String) this.valueSerde.deserializer().deserialize("blah", (byte[]) this.inner.get(Bytes.wrap(str.getBytes())));
    }
}
