/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsRebalanceListener;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.LoggerFactory;

public class StreamsRebalanceListenerTest {
    private final TaskManager taskManager = (TaskManager)EasyMock.mock(TaskManager.class);
    private final StreamThread streamThread = (StreamThread)EasyMock.mock(StreamThread.class);
    private final AtomicInteger assignmentErrorCode = new AtomicInteger();
    private final StreamsRebalanceListener streamsRebalanceListener = new StreamsRebalanceListener((Time)new MockTime(), this.taskManager, this.streamThread, LoggerFactory.getLogger(StreamsRebalanceListenerTest.class), this.assignmentErrorCode);

    @Before
    public void before() {
        EasyMock.expect((Object)this.streamThread.state()).andStubReturn(null);
        EasyMock.expect((Object)this.taskManager.activeTaskIds()).andStubReturn(null);
        EasyMock.expect((Object)this.taskManager.standbyTaskIds()).andStubReturn(null);
    }

    @Test
    public void shouldThrowMissingSourceTopicException() {
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.streamThread});
        this.assignmentErrorCode.set(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code());
        MissingSourceTopicException exception = (MissingSourceTopicException)Assert.assertThrows(MissingSourceTopicException.class, () -> this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList()));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"One or more source topics were missing during rebalance"));
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldSwallowVersionProbingError() {
        EasyMock.expect((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED)).andStubReturn((Object)StreamThread.State.PARTITIONS_REVOKED);
        this.taskManager.handleRebalanceComplete();
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.streamThread});
        this.assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldThrowTaskAssignmentException() {
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.streamThread});
        this.assignmentErrorCode.set(AssignorError.ASSIGNMENT_ERROR.code());
        TaskAssignmentException exception = (TaskAssignmentException)Assert.assertThrows(TaskAssignmentException.class, () -> this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList()));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"Hit an unexpected exception during task assignment phase of rebalance"));
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.streamThread});
        this.assignmentErrorCode.set(Integer.MAX_VALUE);
        TaskAssignmentException exception = (TaskAssignmentException)Assert.assertThrows(TaskAssignmentException.class, () -> this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList()));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"Hit an unrecognized exception during rebalance"));
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldHandleAssignedPartitions() {
        this.taskManager.handleRebalanceComplete();
        EasyMock.expect((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED)).andReturn((Object)StreamThread.State.RUNNING);
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.streamThread});
        this.assignmentErrorCode.set(AssignorError.NONE.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldHandleRevokedPartitions() {
        List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic", 0));
        EasyMock.expect((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).andReturn((Object)StreamThread.State.RUNNING);
        this.taskManager.handleRevocation(partitions);
        EasyMock.replay((Object[])new Object[]{this.streamThread, this.taskManager});
        this.streamsRebalanceListener.onPartitionsRevoked(partitions);
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() {
        EasyMock.expect((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.streamThread, this.taskManager});
        this.streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition("topic", 0)));
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldNotHandleEmptySetOfRevokedPartitions() {
        EasyMock.expect((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).andReturn((Object)StreamThread.State.RUNNING);
        EasyMock.replay((Object[])new Object[]{this.streamThread, this.taskManager});
        this.streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList());
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }

    @Test
    public void shouldHandleLostPartitions() {
        this.taskManager.handleLostAll();
        EasyMock.replay((Object[])new Object[]{this.streamThread, this.taskManager});
        this.streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition("topic", 0)));
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamThread});
    }
}

