package org.apache.kafka.streams.state.internals;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.class */
public class StoreChangeLoggerTest {
    private final String topic = "topic";
    private final Map<Integer, String> logged = new HashMap();
    private final Map<Integer, Headers> loggedHeaders = new HashMap();
    private final InternalMockProcessorContext context = new InternalMockProcessorContext((StateSerdes<?, ?>) StateSerdes.withBuiltinTypes("topic", Integer.class, String.class), (RecordCollector) new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) { // from class: org.apache.kafka.streams.state.internals.StoreChangeLoggerTest.1
        /* JADX WARN: Multi-variable type inference failed */
        public <K1, V1> void send(String str, K1 k1, V1 v1, Headers headers, Integer num, Long l, Serializer<K1> serializer, Serializer<V1> serializer2) {
            StoreChangeLoggerTest.this.logged.put((Integer) k1, (String) v1);
            StoreChangeLoggerTest.this.loggedHeaders.put((Integer) k1, headers);
        }

        public <K1, V1> void send(String str, K1 k1, V1 v1, Headers headers, Long l, Serializer<K1> serializer, Serializer<V1> serializer2, StreamPartitioner<? super K1, ? super V1> streamPartitioner) {
            throw new UnsupportedOperationException();
        }
    });
    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>("topic", this.context, StateSerdes.withBuiltinTypes("topic", Integer.class, String.class));

    @Test
    public void testAddRemove() {
        this.context.setTime(1L);
        this.changeLogger.logChange(0, "zero");
        this.changeLogger.logChange(1, "one");
        this.changeLogger.logChange(2, "two");
        Assert.assertEquals("zero", this.logged.get(0));
        Assert.assertEquals("one", this.logged.get(1));
        Assert.assertEquals("two", this.logged.get(2));
        this.changeLogger.logChange(0, (Object) null);
        Assert.assertNull(this.logged.get(0));
    }

    @Test
    public void shouldNotSendRecordHeadersToChangelogTopic() {
        this.context.headers().add(new RecordHeader("key", "value".getBytes()));
        this.changeLogger.logChange(0, "zero");
        Assert.assertNull(this.loggedHeaders.get(0));
    }
}
