package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.class */
public class HighAvailabilityStreamsPartitionAssignorTest {
    private static final String USER_END_POINT = "localhost:8080";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private Admin adminClient;
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private StreamsConfig streamsConfig = new StreamsConfig(configProps());
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final StreamsMetadataState streamsMetadataState = (StreamsMetadataState) EasyMock.createNiceMock(StreamsMetadataState.class);
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap();
    private final AtomicInteger assignmentError = new AtomicInteger();
    private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
    private final MockTime time = new MockTime();

    private Map<String, Object> configProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("application.id", APPLICATION_ID);
        hashMap.put("bootstrap.servers", USER_END_POINT);
        hashMap.put("__task.manager.instance__", this.taskManager);
        hashMap.put("__streams.metadata.state.instance__", this.streamsMetadataState);
        hashMap.put("__streams.admin.client.instance__", this.adminClient);
        hashMap.put("__assignment.error.code__", this.assignmentError);
        hashMap.put("__next.probing.rebalance.ms__", this.nextProbingRebalanceMs);
        hashMap.put("__time__", this.time);
        hashMap.put("internal.task.assignor.class", HighAvailabilityTaskAssignor.class.getName());
        return hashMap;
    }

    private void configurePartitionAssignorWith(Map<String, Object> map) {
        Map<String, Object> configProps = configProps();
        configProps.putAll(map);
        this.streamsConfig = new StreamsConfig(configProps);
        this.partitionAssignor.configure(configProps);
        EasyMock.replay(new Object[]{this.taskManager, this.adminClient});
        overwriteInternalTopicManagerWithMock();
    }

    private void createMockTaskManager(Set<TaskId> set) {
        createMockTaskManager(getTaskOffsetSums(set));
    }

    private void createMockTaskManager(Map<TaskId, Long> map) {
        this.taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect(this.taskManager.builder()).andReturn(this.builder).anyTimes();
        EasyMock.expect(this.taskManager.getTaskOffsetSums()).andReturn(map).anyTimes();
        EasyMock.expect(this.taskManager.processId()).andReturn(AssignmentTestUtils.UUID_1).anyTimes();
        this.builder.setApplicationId(APPLICATION_ID);
        this.builder.buildTopology();
    }

    private void createMockAdminClient(Map<TopicPartition, Long> map) {
        this.adminClient = (Admin) EasyMock.createMock(AdminClient.class);
        ListOffsetsResult listOffsetsResult = (ListOffsetsResult) EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            ListOffsetsResult.ListOffsetsResultInfo listOffsetsResultInfo = (ListOffsetsResult.ListOffsetsResultInfo) EasyMock.createNiceMock(ListOffsetsResult.ListOffsetsResultInfo.class);
            EasyMock.expect(Long.valueOf(listOffsetsResultInfo.offset())).andStubReturn(entry.getValue());
            EasyMock.replay(new Object[]{listOffsetsResultInfo});
            return listOffsetsResultInfo;
        })));
        EasyMock.expect(this.adminClient.listOffsets((Map) EasyMock.anyObject())).andStubReturn(listOffsetsResult);
        EasyMock.expect(listOffsetsResult.all()).andReturn(kafkaFutureImpl);
        EasyMock.replay(new Object[]{listOffsetsResult});
    }

    private void overwriteInternalTopicManagerWithMock() {
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer, false));
    }

    @Before
    public void setUp() {
        createMockAdminClient(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS);
    }

    @Test
    public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        createMockTaskManager(mkSet);
        this.adminClient = (Admin) EasyMock.createMock(AdminClient.class);
        EasyMock.expect(this.adminClient.listOffsets((Map) EasyMock.anyObject())).andThrow(new StreamsException("Should be handled"));
        configurePartitionAssignorWith(Collections.singletonMap("probing.rebalance.interval.ms", 300000L));
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), getInfo(AssignmentTestUtils.UUID_1, mkSet).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo decode = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData());
        List activeTasks = decode.activeTasks();
        AssignmentInfo decode2 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).userData());
        List activeTasks2 = decode2.activeTasks();
        MatcherAssert.assertThat(activeTasks, CoreMatchers.equalTo(new ArrayList(mkSet)));
        MatcherAssert.assertThat(activeTasks2, Matchers.empty());
        MatcherAssert.assertThat(Long.valueOf(this.time.milliseconds() + 300000), Matchers.anyOf(Matchers.is(Long.valueOf(decode.nextRebalanceMs())), Matchers.is(Long.valueOf(decode2.nextRebalanceMs()))));
    }

    @Test
    public void shouldScheduleProbingRebalanceOnThisClientIfWarmupTasksRequired() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        createMockTaskManager(mkSet);
        createMockAdminClient(getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        configurePartitionAssignorWith(Collections.singletonMap("probing.rebalance.interval.ms", 300000L));
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), getInfo(AssignmentTestUtils.UUID_1, mkSet).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        List activeTasks = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData()).activeTasks();
        List activeTasks2 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).userData()).activeTasks();
        MatcherAssert.assertThat(activeTasks, CoreMatchers.equalTo(new ArrayList(mkSet)));
        MatcherAssert.assertThat(activeTasks2, Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(this.assignmentError.get()), CoreMatchers.equalTo(Integer.valueOf(AssignorError.NONE.code())));
        long nextRebalanceMs = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData()).nextRebalanceMs();
        long nextRebalanceMs2 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).userData()).nextRebalanceMs();
        MatcherAssert.assertThat(Long.valueOf(nextRebalanceMs), CoreMatchers.equalTo(Long.valueOf(this.time.milliseconds() + 300000)));
        MatcherAssert.assertThat(Long.valueOf(nextRebalanceMs2), CoreMatchers.equalTo(Long.MAX_VALUE));
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> list, List<Integer> list2) {
        if (list.size() != list2.size()) {
            throw new IllegalStateException("Passed in " + list.size() + " changelog topic names, but " + list2.size() + " different numPartitions for the topics");
        }
        HashMap hashMap = new HashMap();
        for (int i = 0; i < list.size(); i++) {
            String str = list.get(i);
            int intValue = list2.get(i).intValue();
            for (int i2 = 0; i2 < intValue; i2++) {
                hashMap.put(new TopicPartition(str, i2), Long.MAX_VALUE);
            }
        }
        return hashMap;
    }

    private static SubscriptionInfo getInfo(UUID uuid, Set<TaskId> set) {
        return new SubscriptionInfo(8, 8, uuid, (String) null, getTaskOffsetSums(set), (byte) 0);
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Set<TaskId> set) {
        Map<TaskId, Long> map = (Map) set.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return -2L;
        }));
        map.putAll((Map) AssignmentTestUtils.EMPTY_TASKS.stream().collect(Collectors.toMap(taskId3 -> {
            return taskId3;
        }, taskId4 -> {
            return 0L;
        })));
        return map;
    }
}
