package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/WriteConsistencyVectorTest.class */
public class WriteConsistencyVectorTest {
    private ProcessorContextImpl context;
    private final StreamsConfig streamsConfig = streamsConfigMock();
    private final RecordCollector recordCollector = (RecordCollector) EasyMock.mock(RecordCollector.class);
    private static final long TIMESTAMP = 21;
    private static final long STREAM_TIME = 50;
    private static final String REGISTERED_STORE_NAME = "registered-store";
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
    private static final long VALUE = 42;
    private static final byte[] VALUE_BYTES = String.valueOf(VALUE).getBytes();
    private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1);
    private static final Integer INPUT_PARTITION = 0;
    private static final Long INPUT_OFFSET = 100L;

    @Before
    public void setup() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) EasyMock.mock(ProcessorStateManager.class);
        EasyMock.expect(processorStateManager.taskType()).andStubReturn(Task.TaskType.ACTIVE);
        EasyMock.expect(processorStateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).andStubReturn(CHANGELOG_PARTITION);
        EasyMock.replay(new Object[]{processorStateManager});
        this.context = new ProcessorContextImpl((TaskId) EasyMock.mock(TaskId.class), this.streamsConfig, processorStateManager, (StreamsMetricsImpl) EasyMock.mock(StreamsMetricsImpl.class), (ThreadCache) EasyMock.mock(ThreadCache.class));
        StreamTask streamTask = (StreamTask) EasyMock.mock(StreamTask.class);
        EasyMock.expect(Long.valueOf(streamTask.streamTime())).andReturn(Long.valueOf(STREAM_TIME));
        EasyMock.expect(streamTask.recordCollector()).andStubReturn(this.recordCollector);
        EasyMock.replay(new Object[]{streamTask});
        this.context.transitionToActive(streamTask, (RecordCollector) null, (ThreadCache) null);
        this.context.setCurrentNode(new ProcessorNode("fake", (Processor) null, new HashSet(Arrays.asList("LocalKeyValueStore", "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalTimestampedWindowStore", "LocalSessionStore"))));
    }

    @Test
    public void shouldSendConsistencyVectorToChangelogTopic() {
        Position emptyPosition = Position.emptyPosition();
        emptyPosition.withComponent(INPUT_TOPIC_NAME, INPUT_PARTITION.intValue(), INPUT_OFFSET.longValue());
        this.context.setRecordContext(new ProcessorRecordContext(-1L, INPUT_OFFSET.longValue(), INPUT_PARTITION.intValue(), INPUT_TOPIC_NAME, new RecordHeaders()));
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(emptyPosition).array()));
        this.recordCollector.send(CHANGELOG_PARTITION.topic(), KEY_BYTES, VALUE_BYTES, recordHeaders, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(TIMESTAMP), InternalProcessorContext.BYTES_KEY_SERIALIZER, InternalProcessorContext.BYTEARRAY_VALUE_SERIALIZER);
        StreamTask streamTask = (StreamTask) EasyMock.createNiceMock(StreamTask.class);
        EasyMock.replay(new Object[]{this.recordCollector, streamTask});
        this.context.transitionToActive(streamTask, this.recordCollector, (ThreadCache) null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, TIMESTAMP, emptyPosition);
        EasyMock.verify(new Object[]{this.recordCollector});
    }

    private StreamsConfig streamsConfigMock() {
        StreamsConfig streamsConfig = (StreamsConfig) EasyMock.mock(StreamsConfig.class);
        HashMap hashMap = new HashMap();
        hashMap.put("__iq.consistency.offset.vector.enabled__", true);
        EasyMock.expect(streamsConfig.originals()).andStubReturn(hashMap);
        EasyMock.expect(streamsConfig.values()).andStubReturn(Collections.emptyMap());
        EasyMock.expect(streamsConfig.getString("application.id")).andStubReturn("add-id");
        EasyMock.expect(streamsConfig.defaultValueSerde()).andStubReturn(Serdes.ByteArray());
        EasyMock.expect(streamsConfig.defaultKeySerde()).andStubReturn(Serdes.ByteArray());
        EasyMock.replay(new Object[]{streamsConfig});
        return streamsConfig;
    }
}
