package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/FetchBufferTest.class */
public class FetchBufferTest {
    private final Time time = new MockTime(0, 0, 0);
    private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
    private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
    private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
    private final Set<TopicPartition> allPartitions = partitions(this.topicAPartition0, this.topicAPartition1, this.topicAPartition2);
    private LogContext logContext;
    private SubscriptionState subscriptions;
    private FetchMetricsManager metricsManager;

    @BeforeEach
    public void setup() {
        this.logContext = new LogContext();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", StringSerializer.class.getName());
        properties.put("value.deserializer", StringSerializer.class.getName());
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        this.subscriptions = ConsumerUtils.createSubscriptionState(consumerConfig, this.logContext);
        this.metricsManager = ConsumerUtils.createFetchMetricsManager(ConsumerUtils.createMetrics(consumerConfig, this.time));
    }

    @Test
    public void testBasicPeekAndPoll() {
        FetchBuffer fetchBuffer = new FetchBuffer(this.logContext);
        try {
            CompletedFetch completedFetch = completedFetch(this.topicAPartition0);
            Assertions.assertTrue(fetchBuffer.isEmpty());
            fetchBuffer.add(completedFetch);
            Assertions.assertTrue(fetchBuffer.hasCompletedFetches(completedFetch2 -> {
                return true;
            }));
            Assertions.assertFalse(fetchBuffer.isEmpty());
            Assertions.assertNotNull(fetchBuffer.peek());
            Assertions.assertSame(completedFetch, fetchBuffer.peek());
            Assertions.assertSame(completedFetch, fetchBuffer.poll());
            Assertions.assertNull(fetchBuffer.peek());
            fetchBuffer.close();
        } catch (Throwable th) {
            try {
                fetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCloseClearsData() {
        FetchBuffer fetchBuffer = null;
        try {
            fetchBuffer = new FetchBuffer(this.logContext);
            Assertions.assertNull(fetchBuffer.nextInLineFetch());
            Assertions.assertTrue(fetchBuffer.isEmpty());
            fetchBuffer.add(completedFetch(this.topicAPartition0));
            Assertions.assertFalse(fetchBuffer.isEmpty());
            fetchBuffer.setNextInLineFetch(completedFetch(this.topicAPartition0));
            Assertions.assertNotNull(fetchBuffer.nextInLineFetch());
            if (fetchBuffer != null) {
                fetchBuffer.close();
            }
            Assertions.assertNull(fetchBuffer.nextInLineFetch());
            Assertions.assertTrue(fetchBuffer.isEmpty());
        } catch (Throwable th) {
            if (fetchBuffer != null) {
                fetchBuffer.close();
            }
            throw th;
        }
    }

    @Test
    public void testBufferedPartitions() {
        FetchBuffer fetchBuffer = new FetchBuffer(this.logContext);
        try {
            fetchBuffer.setNextInLineFetch(completedFetch(this.topicAPartition0));
            fetchBuffer.add(completedFetch(this.topicAPartition1));
            fetchBuffer.add(completedFetch(this.topicAPartition2));
            Assertions.assertEquals(this.allPartitions, fetchBuffer.bufferedPartitions());
            fetchBuffer.setNextInLineFetch((CompletedFetch) null);
            Assertions.assertEquals(partitions(this.topicAPartition1, this.topicAPartition2), fetchBuffer.bufferedPartitions());
            fetchBuffer.poll();
            Assertions.assertEquals(partitions(this.topicAPartition2), fetchBuffer.bufferedPartitions());
            fetchBuffer.poll();
            Assertions.assertEquals(partitions(new TopicPartition[0]), fetchBuffer.bufferedPartitions());
            fetchBuffer.close();
        } catch (Throwable th) {
            try {
                fetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testAddAllAndRetainAll() {
        FetchBuffer fetchBuffer = new FetchBuffer(this.logContext);
        try {
            fetchBuffer.setNextInLineFetch(completedFetch(this.topicAPartition0));
            fetchBuffer.addAll(Arrays.asList(completedFetch(this.topicAPartition1), completedFetch(this.topicAPartition2)));
            Assertions.assertEquals(this.allPartitions, fetchBuffer.bufferedPartitions());
            fetchBuffer.retainAll(partitions(this.topicAPartition1, this.topicAPartition2));
            Assertions.assertEquals(partitions(this.topicAPartition1, this.topicAPartition2), fetchBuffer.bufferedPartitions());
            fetchBuffer.retainAll(partitions(this.topicAPartition2));
            Assertions.assertEquals(partitions(this.topicAPartition2), fetchBuffer.bufferedPartitions());
            fetchBuffer.retainAll(partitions(new TopicPartition[0]));
            Assertions.assertEquals(partitions(new TopicPartition[0]), fetchBuffer.bufferedPartitions());
            fetchBuffer.close();
        } catch (Throwable th) {
            try {
                fetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testWakeup() throws Exception {
        FetchBuffer fetchBuffer = new FetchBuffer(this.logContext);
        try {
            Thread thread = new Thread(() -> {
                fetchBuffer.awaitNotEmpty(this.time.timer(Duration.ofMinutes(1L)));
            });
            thread.start();
            fetchBuffer.wakeup();
            thread.join(Duration.ofSeconds(30L).toMillis());
            Assertions.assertFalse(thread.isAlive());
            fetchBuffer.close();
        } catch (Throwable th) {
            try {
                fetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private CompletedFetch completedFetch(TopicPartition topicPartition) {
        return new CompletedFetch(this.logContext, this.subscriptions, BufferSupplier.create(), topicPartition, new FetchResponseData.PartitionData(), new FetchMetricsAggregator(this.metricsManager, this.allPartitions), 0L, ApiKeys.FETCH.latestVersion());
    }

    private static Set<TopicPartition> partitions(TopicPartition... topicPartitionArr) {
        return new HashSet(Arrays.asList(topicPartitionArr));
    }
}
