package io.confluent.kafka.replication.push;

import io.confluent.kafka.replication.push.utils.TestPushSession;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/replication/push/PushManagerImplTest.class */
public class PushManagerImplTest {
    @Test
    public void testShutdown() {
        PushManagerImpl createManager = createManager();
        try {
            Assertions.assertFalse(createManager.isActive());
            createManager.startup();
            Assertions.assertTrue(createManager.isActive());
            Assertions.assertTrue(createManager.shutdown());
            Assertions.assertFalse(createManager.isActive());
            Assertions.assertFalse(createManager.shutdown());
        } finally {
            createManager.shutdown();
        }
    }

    @Test
    public void testPushMethodsBeforeAfterShutdown() {
        PushManagerImpl createManager = createManager();
        try {
            createManager.startup();
            TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo");
            int i = 1;
            TestPushSession testPushSession = new TestPushSession(12, 34L, 56L);
            MemoryRecords memoryRecords = (MemoryRecords) Mockito.mock(MemoryRecords.class);
            Mockito.when(memoryRecords.buffer()).thenReturn(ByteBuffer.allocate(12));
            createManager.startPush(topicIdPartition, 1, testPushSession);
            createManager.onLeaderAppend(topicIdPartition, Collections.singleton(1), 1110987654321L, 1234567891011L, memoryRecords);
            createManager.stopPush(topicIdPartition, Collections.singleton(1), true);
            createManager.shutdown();
            Assertions.assertThrows(IllegalStateException.class, () -> {
                createManager.startPush(topicIdPartition, i, testPushSession);
            });
            Assertions.assertThrows(IllegalStateException.class, () -> {
                createManager.onLeaderAppend(topicIdPartition, Collections.singleton(Integer.valueOf(i)), 1110987654321L, 1234567891011L, memoryRecords);
            });
            Assertions.assertThrows(IllegalStateException.class, () -> {
                createManager.stopPush(topicIdPartition, Collections.singleton(Integer.valueOf(i)), true);
            });
            createManager.shutdown();
        } catch (Throwable th) {
            createManager.shutdown();
            throw th;
        }
    }

    @Test
    public void testOnLeaderAppendWithEmptyReplicaSet() {
        PushManagerImpl createManager = createManager();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo");
        MemoryRecords memoryRecords = (MemoryRecords) Mockito.mock(MemoryRecords.class);
        Mockito.when(Integer.valueOf(memoryRecords.sizeInBytes())).thenThrow(new Throwable[]{new AssertionError()});
        createManager.onLeaderAppend(topicIdPartition, Collections.emptySet(), 1110987654321L, 1234567891011L, memoryRecords);
    }

    @Test
    public void testGetPusher() {
        PushManagerImpl createManager = createManager(Collections.singletonMap("confluent.replication.max.replica.pushers", "2"));
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo");
        try {
            createManager.startup();
            Pusher pusher = createManager.getPusher(topicIdPartition, 0);
            Pusher pusher2 = createManager.getPusher(topicIdPartition, 100);
            Pusher pusher3 = createManager.getPusher(topicIdPartition, 23);
            Pusher pusher4 = createManager.getPusher(topicIdPartition, 50);
            Assertions.assertEquals(pusher, pusher3);
            Assertions.assertEquals(pusher2, pusher4);
            createManager.shutdown();
        } catch (Throwable th) {
            createManager.shutdown();
            throw th;
        }
    }

    @Test
    public void testGetPusherThreadSafety() {
        PushManagerImpl createManager = createManager(Collections.singletonMap("confluent.replication.max.replica.pushers", "2"));
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo");
        int[] iArr = {0, 100, 23, 87, 42};
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            try {
                createManager.startup();
                for (int i : iArr) {
                    newFixedThreadPool.execute(() -> {
                        Pusher pusher = createManager.getPusher(topicIdPartition, i);
                        concurrentHashMap.putIfAbsent(pusher, 0);
                        concurrentHashMap.computeIfPresent(pusher, (pusher2, num) -> {
                            return Integer.valueOf(num.intValue() + 1);
                        });
                    });
                }
                newFixedThreadPool.shutdown();
                newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS);
                Assertions.assertEquals(2, concurrentHashMap.values().size());
                Assertions.assertTrue(concurrentHashMap.values().contains(2) && concurrentHashMap.values().contains(3));
                createManager.shutdown();
            } catch (InterruptedException e) {
                throw new RuntimeException("Service threads did not shutdown in 10 seconds");
            }
        } catch (Throwable th) {
            createManager.shutdown();
            throw th;
        }
    }

    private static PushManagerImpl createManager() {
        return createManager(Collections.emptyMap());
    }

    private static PushManagerImpl createManager(Map<String, String> map) {
        return new PushManagerImpl(new ReplicationConfig(map), Time.SYSTEM, new Metrics(), num -> {
            return Optional.of(new Node(num.intValue(), "localhost", 9092));
        }, num2 -> {
            return (NetworkClient) Mockito.mock(NetworkClient.class);
        });
    }
}
