package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.class */
public class ChangelogTopicsTest {
    final InternalTopicManager internalTopicManager = (InternalTopicManager) Mockito.mock(InternalTopicManager.class);
    private static final Map<String, String> TOPIC_CONFIG = Collections.singletonMap("config1", "val1");
    private static final String REPARTITION_TOPIC_NAME = "repartition";
    private static final RepartitionTopicConfig REPARTITION_TOPIC_CONFIG = new RepartitionTopicConfig(REPARTITION_TOPIC_NAME, TOPIC_CONFIG);
    private static final String CHANGELOG_TOPIC_NAME1 = "changelog1";
    private static final UnwindowedUnversionedChangelogTopicConfig CHANGELOG_TOPIC_CONFIG = new UnwindowedUnversionedChangelogTopicConfig(CHANGELOG_TOPIC_NAME1, TOPIC_CONFIG);
    private static final String SINK_TOPIC_NAME = "sink";
    private static final String SOURCE_TOPIC_NAME = "source";
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO1 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet(new String[]{SINK_TOPIC_NAME}), Utils.mkSet(new String[]{SOURCE_TOPIC_NAME}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)}));
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO2 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet(new String[]{SINK_TOPIC_NAME}), Utils.mkSet(new String[]{SOURCE_TOPIC_NAME}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)}), Utils.mkMap(new Map.Entry[0]));
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO3 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet(new String[]{SINK_TOPIC_NAME}), Utils.mkSet(new String[]{SOURCE_TOPIC_NAME}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(SOURCE_TOPIC_NAME, CHANGELOG_TOPIC_CONFIG)}));
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO4 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet(new String[]{SINK_TOPIC_NAME}), Utils.mkSet(new String[]{SOURCE_TOPIC_NAME}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(SOURCE_TOPIC_NAME, (Object) null), Utils.mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)}));
    private static final TaskId TASK_0_0 = new TaskId(0, 0);
    private static final TaskId TASK_0_1 = new TaskId(0, 1);
    private static final TaskId TASK_0_2 = new TaskId(0, 2);

    @Test
    public void shouldNotContainChangelogsForStatelessTasks() {
        Mockito.when(this.internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, TOPICS_INFO2)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, Utils.mkSet(new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2}))}), "[test] ");
        changelogTopics.setup();
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingSourceTopicBasedPartitions(), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingNonSourceTopicBasedPartitions(), CoreMatchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() {
        Mockito.when(this.internalTopicManager.makeReady(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)}))).thenReturn(Utils.mkSet(new String[]{CHANGELOG_TOPIC_NAME1}));
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, TOPICS_INFO1)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, Utils.mkSet(new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2}))}), "[test] ");
        changelogTopics.setup();
        MatcherAssert.assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), CoreMatchers.is(3));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingSourceTopicBasedPartitions(), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingNonSourceTopicBasedPartitions(), CoreMatchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
        Mockito.when(this.internalTopicManager.makeReady(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)}))).thenReturn(Collections.emptySet());
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, TOPICS_INFO1)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, Utils.mkSet(new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2}))}), "[test] ");
        changelogTopics.setup();
        MatcherAssert.assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), CoreMatchers.is(3));
        TopicPartition topicPartition = new TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
        TopicPartition topicPartition2 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
        TopicPartition topicPartition3 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 2);
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition})));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition2})));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition3})));
        MatcherAssert.assertThat(changelogTopics.preExistingSourceTopicBasedPartitions(), CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(changelogTopics.preExistingNonSourceTopicBasedPartitions(), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2, topicPartition3})));
    }

    @Test
    public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
        Mockito.when(this.internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, TOPICS_INFO3)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, Utils.mkSet(new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2}))}), "[test] ");
        changelogTopics.setup();
        TopicPartition topicPartition = new TopicPartition(SOURCE_TOPIC_NAME, 0);
        TopicPartition topicPartition2 = new TopicPartition(SOURCE_TOPIC_NAME, 1);
        TopicPartition topicPartition3 = new TopicPartition(SOURCE_TOPIC_NAME, 2);
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition})));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition2})));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition3})));
        MatcherAssert.assertThat(changelogTopics.preExistingSourceTopicBasedPartitions(), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2, topicPartition3})));
        MatcherAssert.assertThat(changelogTopics.preExistingNonSourceTopicBasedPartitions(), CoreMatchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldContainBothTypesOfPreExistingChangelogs() {
        Mockito.when(this.internalTopicManager.makeReady(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)}))).thenReturn(Collections.emptySet());
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, TOPICS_INFO4)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.SUBTOPOLOGY_0, Utils.mkSet(new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2}))}), "[test] ");
        changelogTopics.setup();
        MatcherAssert.assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), CoreMatchers.is(3));
        TopicPartition topicPartition = new TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
        TopicPartition topicPartition2 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
        TopicPartition topicPartition3 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 2);
        TopicPartition topicPartition4 = new TopicPartition(SOURCE_TOPIC_NAME, 0);
        TopicPartition topicPartition5 = new TopicPartition(SOURCE_TOPIC_NAME, 1);
        TopicPartition topicPartition6 = new TopicPartition(SOURCE_TOPIC_NAME, 2);
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition4, topicPartition})));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition5, topicPartition2})));
        MatcherAssert.assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition6, topicPartition3})));
        MatcherAssert.assertThat(changelogTopics.preExistingSourceTopicBasedPartitions(), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition4, topicPartition5, topicPartition6})));
        MatcherAssert.assertThat(changelogTopics.preExistingNonSourceTopicBasedPartitions(), CoreMatchers.is(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2, topicPartition3})));
    }
}
