package io.confluent.kafka.replication.push;

import io.confluent.kafka.replication.push.buffer.RefCountingMemoryTracker;
import io.confluent.kafka.replication.push.utils.TestPushSession;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AppendRecordsRequestData;
import org.apache.kafka.common.message.AppendRecordsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AppendRecordsRequest;
import org.apache.kafka.common.requests.AppendRecordsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/replication/push/PusherThreadTest.class */
public class PusherThreadTest {
    private static final int LINGER_MS = 10;
    private ReplicationConfig config;
    private MockClient client;
    private Function<Integer, Optional<Node>> nodeResolver;
    private RefCountingMemoryTracker<MemoryRecords> tracker;
    private Time time;
    private PusherThread pusher;
    private static final Uuid TOPIC_ID = Uuid.randomUuid();
    private static final TopicIdPartition PARTITION_0 = new TopicIdPartition(TOPIC_ID, new TopicPartition("foo", 0));
    private static final int REPLICA_ID = 1;
    private static final TopicIdPartition PARTITION_1 = new TopicIdPartition(TOPIC_ID, new TopicPartition("foo", REPLICA_ID));

    @BeforeEach
    public void setUp() {
        this.config = new ReplicationConfig(Collections.singletonMap("confluent.replication.linger.ms", Integer.valueOf(LINGER_MS)));
        this.nodeResolver = num -> {
            return Optional.of(new Node(num.intValue(), "http://localhost", 9092 + num.intValue()));
        };
        this.tracker = (RefCountingMemoryTracker) Mockito.mock(RefCountingMemoryTracker.class);
        this.time = new MockTime(0L);
        this.client = new MockClient(this.time);
        this.pusher = new PusherThread("Pusher-thread-0-test", this.config, this.client, this.nodeResolver, this.tracker, this.time);
    }

    @AfterEach
    public void tearDown() {
        this.pusher.shutdown();
    }

    @Test
    public void testConsecutiveStartPushCallsThrow() {
        CompletableFuture startPush = this.pusher.startPush(PARTITION_0, REPLICA_ID, newSession((l, l2) -> {
        }, () -> {
        }));
        CompletableFuture startPush2 = this.pusher.startPush(PARTITION_0, REPLICA_ID, newSession((l3, l4) -> {
        }, () -> {
        }));
        this.pusher.doWork();
        Assertions.assertTrue(startPush.isDone() && !startPush.isCompletedExceptionally());
        Assertions.assertTrue(startPush2.isDone() && startPush2.isCompletedExceptionally());
        try {
            startPush2.get();
            Assertions.fail("Expected IllegalStateException after a second consecutive startPush call");
        } catch (Exception e) {
            Assertions.assertTrue(e instanceof ExecutionException);
            Assertions.assertTrue(e.getCause() instanceof IllegalStateException);
        }
    }

    @Test
    public void testStartPushWithOutdatedReplicaEpochFails() {
        CompletableFuture startPush = this.pusher.startPush(PARTITION_0, REPLICA_ID, new TestPushSession(REPLICA_ID, 123 + 1, 1L, (l, l2) -> {
        }, () -> {
        }));
        this.pusher.doWork();
        Assertions.assertTrue(startPush.isDone() && !startPush.isCompletedExceptionally());
        this.pusher.stopPush(PARTITION_0, REPLICA_ID, true);
        this.pusher.doWork();
        CompletableFuture startPush2 = this.pusher.startPush(PARTITION_0, REPLICA_ID, new TestPushSession(REPLICA_ID, 123L, 1L, (l3, l4) -> {
        }, () -> {
        }));
        this.pusher.doWork();
        Assertions.assertTrue(startPush2.isDone() && startPush2.isCompletedExceptionally());
    }

    @Test
    public void testPusherThreadWithLingerAndMaxWaitMs() {
        Assertions.assertEquals(LINGER_MS, this.config.lingerMs());
        this.pusher.startPush(PARTITION_0, REPLICA_ID, newSession((l, l2) -> {
        }, () -> {
        }));
        this.pusher.onLogStartOffsetUpdate(PARTITION_0, REPLICA_ID, 2L);
        this.pusher.doWork();
        Assertions.assertTrue(this.client.requests().isEmpty());
        this.time.sleep(10L);
        this.pusher.doWork();
        Assertions.assertTrue(this.client.requests().isEmpty());
        this.time.sleep(this.config.maxWaitMs() - LINGER_MS);
        this.pusher.doWork();
        Assertions.assertEquals(REPLICA_ID, this.client.requests().size());
        this.client.reset();
        this.pusher.onLeaderAppend(PARTITION_0, REPLICA_ID, 3L, 4L, new MemoryRecords(ByteBuffer.allocate(1024)));
        this.pusher.doWork();
        Assertions.assertTrue(this.client.requests().isEmpty());
        this.time.sleep(10L);
        this.pusher.doWork();
        Assertions.assertEquals(REPLICA_ID, this.client.requests().size());
    }

    @Test
    public void testResponseWithRetryTimeoutErrorEndsSession() {
        testResponseErrorEndsSession(() -> {
            this.pusher.doWork();
            this.time.sleep(this.config.retryTimeoutMs() + REPLICA_ID);
            this.client.respond(newResponseData(PARTITION_0), true);
        });
    }

    @Test
    public void testResponseWithUnsupportedVersionErrorEndsSession() {
        testResponseErrorEndsSession(() -> {
            this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
                return abstractRequest instanceof AppendRecordsRequest;
            });
        });
    }

    @Test
    public void testResponseWithAuthenticationErrorEndsSession() {
        testResponseErrorEndsSession(() -> {
            this.client.createPendingAuthenticationError(this.nodeResolver.apply(Integer.valueOf(REPLICA_ID)).get(), 0L);
        });
    }

    @Test
    public void testResponseWithStaleBrokerEpochErrorEndsSession() {
        testResponseErrorEndsSession(() -> {
            this.client.prepareResponse(new AppendRecordsResponse(new AppendRecordsResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code())));
        });
    }

    private void testResponseErrorEndsSession(Runnable runnable) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.pusher.startPush(PARTITION_0, REPLICA_ID, newSession((l, l2) -> {
            atomicBoolean.set(true);
        }, () -> {
            atomicBoolean2.compareAndSet(false, true);
        }));
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        setUpRequestData(PARTITION_0, hashSet, hashSet2);
        this.pusher.doWork();
        this.time.sleep(this.config.lingerMs() + REPLICA_ID);
        runnable.run();
        this.pusher.doWork();
        Assertions.assertEquals(hashSet, hashSet2);
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testStaleLeaderEpochResponseIsNotProcessed() {
        testStaleResponseIsNotProcessed(true, false, false);
    }

    @Test
    public void testStaleReplicaEpochResponseIsNotProcessed() {
        testStaleResponseIsNotProcessed(false, true, false);
    }

    @Test
    public void testStaleSessionIdResponseIsNotProcessed() {
        testStaleResponseIsNotProcessed(false, false, true);
    }

    private void testStaleResponseIsNotProcessed(boolean z, boolean z2, boolean z3) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        PushSession newSession = newSession((l, l2) -> {
            atomicBoolean.set(true);
        }, () -> {
            atomicBoolean.set(true);
        });
        this.pusher.startPush(PARTITION_0, REPLICA_ID, newSession);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        setUpRequestData(PARTITION_0, hashSet, hashSet2);
        this.pusher.doWork();
        this.time.sleep(this.config.lingerMs());
        this.pusher.doWork();
        this.pusher.stopPush(PARTITION_0, REPLICA_ID, true);
        this.pusher.startPush(PARTITION_0, REPLICA_ID, new TestPushSession(z ? newSession.leaderEpoch() + REPLICA_ID : newSession.leaderEpoch(), z2 ? newSession.replicaEpoch() + 1 : newSession.replicaEpoch(), z3 ? newSession.replicationSessionId() + 1 : newSession.replicationSessionId(), (l3, l4) -> {
            atomicBoolean.set(true);
        }, () -> {
            atomicBoolean.set(true);
        }));
        this.client.respond(abstractRequest -> {
            return abstractRequest instanceof AppendRecordsRequest;
        }, newResponseData(PARTITION_0));
        this.pusher.doWork();
        Assertions.assertEquals(hashSet, hashSet2);
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testEndSessionResentOnRetriableError() {
        Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError = testEndReplicationSessionResponseError(() -> {
            this.time.sleep(this.config.retryTimeoutMs() + REPLICA_ID);
            this.client.respond(new AppendRecordsResponse((AppendRecordsResponseData) null), true);
        });
        Assertions.assertTrue((testEndReplicationSessionResponseError == null || testEndReplicationSessionResponseError.isEmpty()) ? false : true);
        List list = testEndReplicationSessionResponseError.peek().request.build().data().topics();
        Assertions.assertEquals(REPLICA_ID, list.size());
        Assertions.assertEquals(REPLICA_ID, ((AppendRecordsRequestData.TopicData) list.get(0)).partitions().size());
        AppendRecordsRequestData.PartitionData partitionData = (AppendRecordsRequestData.PartitionData) ((AppendRecordsRequestData.TopicData) list.get(0)).partitions().get(0);
        Assertions.assertEquals(PARTITION_0.partition(), partitionData.partitionIndex());
        Assertions.assertTrue(partitionData.endReplicationSession());
    }

    @Test
    public void testEndSessionNotResentOnRetriableErrorButStaleLeaderEpoch() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError = testEndReplicationSessionResponseError(() -> {
            this.pusher.startPush(PARTITION_0, REPLICA_ID, new TestPushSession(Integer.MAX_VALUE, 1L, 1L, (l, l2) -> {
                atomicBoolean.set(true);
            }, () -> {
                atomicBoolean.set(true);
            }));
            this.pusher.doWork();
            this.time.sleep(this.config.retryTimeoutMs() + REPLICA_ID);
            this.client.respond(new AppendRecordsResponse((AppendRecordsResponseData) null), true);
        });
        Assertions.assertTrue(testEndReplicationSessionResponseError != null && testEndReplicationSessionResponseError.isEmpty());
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testEndSessionNotResentOnRetriableErrorButStaleSessionId() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError = testEndReplicationSessionResponseError(() -> {
            this.pusher.startPush(PARTITION_0, REPLICA_ID, new TestPushSession(REPLICA_ID, 1L, Long.MAX_VALUE, (l, l2) -> {
                atomicBoolean.set(true);
            }, () -> {
                atomicBoolean.set(true);
            }));
            this.pusher.doWork();
            this.time.sleep(this.config.retryTimeoutMs() + REPLICA_ID);
            this.client.respond(new AppendRecordsResponse((AppendRecordsResponseData) null), true);
        });
        Assertions.assertTrue(testEndReplicationSessionResponseError != null && testEndReplicationSessionResponseError.isEmpty());
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testEndSessionNotResentOnRetriableErrorButStaleReplicaEpoch() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError = testEndReplicationSessionResponseError(() -> {
            this.pusher.startPush(PARTITION_0, REPLICA_ID, new TestPushSession(REPLICA_ID, Long.MAX_VALUE, 1L, (l, l2) -> {
                atomicBoolean.set(true);
            }, () -> {
                atomicBoolean.set(true);
            }));
            this.pusher.doWork();
            this.time.sleep(this.config.retryTimeoutMs() + REPLICA_ID);
            this.client.respond(new AppendRecordsResponse((AppendRecordsResponseData) null), true);
        }, false);
        Assertions.assertTrue(testEndReplicationSessionResponseError != null && testEndReplicationSessionResponseError.isEmpty());
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testEndSessionNotResentOnNonRetriableError() {
        Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError = testEndReplicationSessionResponseError(() -> {
            this.client.respond(new AppendRecordsResponse(new AppendRecordsResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code())));
        });
        Assertions.assertTrue(testEndReplicationSessionResponseError != null && testEndReplicationSessionResponseError.isEmpty());
    }

    private Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError(Runnable runnable) {
        return testEndReplicationSessionResponseError(runnable, true);
    }

    private Deque<RequestAndCompletionHandler> testEndReplicationSessionResponseError(Runnable runnable, boolean z) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        PushSession newSession = newSession((l, l2) -> {
            atomicBoolean.set(true);
        }, () -> {
            atomicBoolean.set(true);
        });
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        PushSession newSession2 = newSession((l3, l4) -> {
            atomicBoolean.set(true);
        }, () -> {
            atomicBoolean2.compareAndSet(false, true);
        });
        this.pusher.startPush(PARTITION_0, REPLICA_ID, newSession);
        this.pusher.startPush(PARTITION_1, REPLICA_ID, newSession2);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        setUpRequestData(PARTITION_0, hashSet, hashSet2);
        HashSet hashSet3 = new HashSet(hashSet);
        setUpRequestData(PARTITION_1, hashSet3, hashSet2);
        this.pusher.stopPush(PARTITION_0, REPLICA_ID, true);
        this.pusher.doWork();
        Assertions.assertEquals(hashSet, hashSet2);
        this.time.sleep(this.config.lingerMs() + REPLICA_ID);
        this.pusher.doWork();
        runnable.run();
        this.pusher.doWork();
        Assertions.assertEquals(hashSet3, hashSet2);
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(atomicBoolean2.get()));
        return (Deque) this.pusher.pendingRequests.get(Integer.valueOf(REPLICA_ID));
    }

    @Test
    public void testResponseWithPartialSuccess() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        PushSession newSession = newSession((l, l2) -> {
            atomicBoolean2.compareAndSet(false, true);
        }, () -> {
            atomicBoolean.set(true);
        });
        AtomicBoolean atomicBoolean3 = new AtomicBoolean();
        PushSession newSession2 = newSession((l3, l4) -> {
            atomicBoolean.set(true);
        }, () -> {
            atomicBoolean3.compareAndSet(false, true);
        });
        HashMap hashMap = new HashMap();
        hashMap.put(PARTITION_0, newSession);
        hashMap.put(PARTITION_1, newSession2);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (Map.Entry entry : hashMap.entrySet()) {
            TopicIdPartition topicIdPartition = (TopicIdPartition) entry.getKey();
            this.pusher.startPush(topicIdPartition, REPLICA_ID, (PushSession) entry.getValue());
            setUpRequestData(topicIdPartition, hashSet, hashSet2);
        }
        this.pusher.doWork();
        this.time.sleep(this.config.lingerMs());
        this.client.prepareResponse(newResponseData(hashMap.keySet(), Collections.singletonMap(PARTITION_1, Short.valueOf(Errors.FENCED_LEADER_EPOCH.code()))));
        this.pusher.doWork();
        Assertions.assertEquals(hashSet, hashSet2);
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertTrue(atomicBoolean2.get());
        Assertions.assertTrue(atomicBoolean3.get());
    }

    private void setUpRequestData(TopicIdPartition topicIdPartition, Set<MemoryRecords> set, Set<MemoryRecords> set2) {
        Random random = new Random();
        for (int i = 0; i < LINGER_MS; i += REPLICA_ID) {
            byte[] bArr = new byte[128];
            random.nextBytes(bArr);
            MemoryRecords memoryRecords = new MemoryRecords(ByteBuffer.wrap(bArr));
            this.pusher.onLeaderAppend(topicIdPartition, REPLICA_ID, 0L, i + REPLICA_ID, memoryRecords);
            set.add(memoryRecords);
        }
        Mockito.when(Integer.valueOf(this.tracker.countDown(ArgumentMatchers.any(MemoryRecords.class)))).then(invocationOnMock -> {
            set2.add(invocationOnMock.getArgument(0));
            return 0;
        });
    }

    private AppendRecordsResponse newResponseData(TopicIdPartition topicIdPartition) {
        return newResponseData(Collections.singleton(topicIdPartition), Collections.emptyMap());
    }

    private AppendRecordsResponse newResponseData(Set<TopicIdPartition> set, Map<TopicIdPartition, Short> map) {
        ArrayList arrayList = new ArrayList();
        for (TopicIdPartition topicIdPartition : set) {
            AppendRecordsResponseData.PartitionData partitionData = new AppendRecordsResponseData.PartitionData();
            partitionData.setPartitionIndex(topicIdPartition.partition());
            short shortValue = map.getOrDefault(topicIdPartition, Short.valueOf(Errors.NONE.code())).shortValue();
            if (shortValue != Errors.NONE.code()) {
                partitionData.setErrorCode(shortValue);
            } else {
                partitionData.setLogEndOffset(34L);
                partitionData.setLogStartOffset(12L);
            }
            arrayList.add(partitionData);
        }
        return new AppendRecordsResponse(new AppendRecordsResponseData().setTopics(Collections.singletonList(new AppendRecordsResponseData.TopicData().setTopicId(TOPIC_ID).setPartitions(arrayList))));
    }

    private PushSession newSession(BiConsumer<Long, Long> biConsumer, Runnable runnable) {
        return new TestPushSession(REPLICA_ID, 1L, 1L, biConsumer, runnable);
    }
}
