package org.apache.kafka.coordinator.group.runtime;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.class */
public class CoordinatorRuntimeTest {
    private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$DirectEventProcessor.class */
    private static class DirectEventProcessor implements CoordinatorEventProcessor {
        private DirectEventProcessor() {
        }

        public void enqueue(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
            try {
                coordinatorEvent.run();
            } catch (Throwable th) {
                coordinatorEvent.complete(th);
            }
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$ManualEventProcessor.class */
    private static class ManualEventProcessor implements CoordinatorEventProcessor {
        private Queue<CoordinatorEvent> queue;

        private ManualEventProcessor() {
            this.queue = new LinkedList();
        }

        public void enqueue(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
            this.queue.add(coordinatorEvent);
        }

        public boolean poll() {
            CoordinatorEvent poll = this.queue.poll();
            if (poll == null) {
                return false;
            }
            try {
                poll.run();
                return true;
            } catch (Throwable th) {
                poll.complete(th);
                return true;
            }
        }

        public int size() {
            return this.queue.size();
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorLoader.class */
    private static class MockCoordinatorLoader implements CoordinatorLoader<String> {
        private MockCoordinatorLoader() {
        }

        public CompletableFuture<Void> load(TopicPartition topicPartition, CoordinatorPlayback<String> coordinatorPlayback) {
            return CompletableFuture.completedFuture(null);
        }

        public void close() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShard.class */
    public static class MockCoordinatorShard implements CoordinatorShard<String> {
        private final TimelineHashSet<String> records;
        private final CoordinatorTimer<Void, String> timer;

        MockCoordinatorShard(SnapshotRegistry snapshotRegistry, CoordinatorTimer<Void, String> coordinatorTimer) {
            this.records = new TimelineHashSet<>(snapshotRegistry, 0);
            this.timer = coordinatorTimer;
        }

        @Override // 
        public void replay(String str) throws RuntimeException {
            this.records.add(str);
        }

        Set<String> records() {
            return Collections.unmodifiableSet(new HashSet((Collection) this.records));
        }

        CoordinatorTimer<Void, String> timer() {
            return this.timer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShardBuilder.class */
    public static class MockCoordinatorShardBuilder implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
        private SnapshotRegistry snapshotRegistry;
        private CoordinatorTimer<Void, String> timer;

        private MockCoordinatorShardBuilder() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(LogContext logContext) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(Time time) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(CoordinatorTimer<Void, String> coordinatorTimer) {
            this.timer = coordinatorTimer;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(TopicPartition topicPartition) {
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public MockCoordinatorShard m45build() {
            return new MockCoordinatorShard((SnapshotRegistry) Objects.requireNonNull(this.snapshotRegistry), (CoordinatorTimer) Objects.requireNonNull(this.timer));
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockCoordinatorShardBuilderSupplier.class */
    private static class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
        private MockCoordinatorShardBuilderSupplier() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
            return new MockCoordinatorShardBuilder();
        }
    }

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest$MockPartitionWriter.class */
    private static class MockPartitionWriter extends InMemoryPartitionWriter<String> {
        private final int maxRecordsInBatch;

        public MockPartitionWriter() {
            this(Integer.MAX_VALUE);
        }

        public MockPartitionWriter(int i) {
            super(false);
            this.maxRecordsInBatch = i;
        }

        @Override // org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
        public void registerListener(TopicPartition topicPartition, PartitionWriter.Listener listener) {
            super.registerListener(topicPartition, listener);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
        public void deregisterListener(TopicPartition topicPartition, PartitionWriter.Listener listener) {
            super.deregisterListener(topicPartition, listener);
        }

        @Override // org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter
        public long append(TopicPartition topicPartition, List<String> list) throws KafkaException {
            if (list.size() <= this.maxRecordsInBatch) {
                return super.append(topicPartition, list);
            }
            throw new KafkaException(String.format("Number of records %d greater than the maximum allowed %d.", Integer.valueOf(list.size()), Integer.valueOf(this.maxRecordsInBatch)));
        }
    }

    @Test
    public void testScheduleLoading() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(TP, mockCoordinatorShard)).thenReturn(completableFuture);
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(0, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator);
        completableFuture.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onLoaded(MetadataImage.EMPTY);
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).registerListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any(PartitionWriter.Listener.class));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.eq(contextOrThrow.snapshotRegistry));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withLogContext((LogContext) ArgumentMatchers.eq(contextOrThrow.logContext));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withTime((Time) ArgumentMatchers.eq(mockTimer.time()));
        ((MockCoordinatorShardBuilder) Mockito.verify(mockCoordinatorShardBuilder, Mockito.times(1))).withTimer((CoordinatorTimer) ArgumentMatchers.eq(contextOrThrow.timer));
    }

    @Test
    public void testScheduleLoadingWithFailure() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(TP, mockCoordinatorShard)).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(0, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator);
        completableFuture.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).deregisterListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any(PartitionWriter.Listener.class));
    }

    @Test
    public void testScheduleLoadingWithStalePartitionEpoch() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(TP, mockCoordinatorShard)).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator);
        completableFuture.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleLoadOperation(TP, 0);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
    }

    @Test
    public void testScheduleLoadingAfterLoadingFailure() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(TP, mockCoordinatorShard)).thenReturn(completableFuture);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        Assertions.assertEquals(mockCoordinatorShard, contextOrThrow.coordinator);
        completableFuture.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
        MockCoordinatorShard mockCoordinatorShard2 = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard2);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(TP, mockCoordinatorShard2)).thenReturn(completableFuture2);
        build.scheduleLoadOperation(TP, 11);
        CoordinatorRuntime.CoordinatorContext contextOrThrow2 = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, contextOrThrow2.state);
        Assertions.assertEquals(11, contextOrThrow2.epoch);
        Assertions.assertEquals(mockCoordinatorShard2, contextOrThrow2.coordinator);
        completableFuture2.complete(null);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow2.state);
    }

    @Test
    public void testScheduleUnloading() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleUnloadOperation(TP, contextOrThrow.epoch + 1);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, contextOrThrow.state);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard, Mockito.times(1))).onUnloaded();
        ((MockPartitionWriter) Mockito.verify(mockPartitionWriter, Mockito.times(1))).deregisterListener((TopicPartition) ArgumentMatchers.eq(TP), (PartitionWriter.Listener) ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> {
            build.contextOrThrow(TP);
        });
    }

    @Test
    public void testScheduleUnloadingWithStalePartitionEpoch() {
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
        build.scheduleUnloadOperation(TP, 0);
        Assertions.assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, contextOrThrow.state);
        Assertions.assertEquals(10, contextOrThrow.epoch);
    }

    @Test
    public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.snapshotRegistry.epochsList());
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertEquals(2L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.snapshotRegistry.epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(Arrays.asList("record1", "record2"), mockPartitionWriter.records(TP));
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3"), "response2");
        });
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(3L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), contextOrThrow.snapshotRegistry.epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2", "record3"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(Arrays.asList("record1", "record2", "record3"), mockPartitionWriter.records(TP));
        CompletableFuture scheduleWriteOperation3 = build.scheduleWriteOperation("write#3", TP, mockCoordinatorShard3 -> {
            return new CoordinatorResult(Collections.emptyList(), "response3");
        });
        Assertions.assertFalse(scheduleWriteOperation3.isDone());
        Assertions.assertEquals(3L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), contextOrThrow.snapshotRegistry.epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2", "record3"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(Arrays.asList("record1", "record2", "record3"), mockPartitionWriter.records(TP));
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertEquals("response1", scheduleWriteOperation.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(2L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Arrays.asList(2L, 3L), contextOrThrow.snapshotRegistry.epochsList());
        mockPartitionWriter.commit(TP, 3L);
        Assertions.assertTrue(scheduleWriteOperation2.isDone());
        Assertions.assertTrue(scheduleWriteOperation3.isDone());
        Assertions.assertEquals("response2", scheduleWriteOperation2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals("response3", scheduleWriteOperation3.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(3L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Collections.singletonList(3L), contextOrThrow.snapshotRegistry.epochsList());
        CompletableFuture scheduleWriteOperation4 = build.scheduleWriteOperation("write#4", TP, mockCoordinatorShard4 -> {
            return new CoordinatorResult(Collections.emptyList(), "response4");
        });
        Assertions.assertTrue(scheduleWriteOperation4.isDone());
        Assertions.assertEquals("response4", scheduleWriteOperation4.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(Collections.singletonList(3L), contextOrThrow.snapshotRegistry.epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenInactive() {
        MockTimer mockTimer = new MockTimer();
        TestUtils.assertFutureThrows(new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build().scheduleWriteOperation("write", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Collections.emptyList(), "response1");
        }), NotCoordinatorException.class);
    }

    @Test
    public void testScheduleWriteOpWhenOpFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write", TP, mockCoordinatorShard -> {
            throw new KafkaException("error");
        }), KafkaException.class);
    }

    @Test
    public void testScheduleWriteOpWhenReplayFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.snapshotRegistry.epochsList());
        contextOrThrow.coordinator = new MockCoordinatorShard(contextOrThrow.snapshotRegistry, contextOrThrow.timer) { // from class: org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.1
            @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard
            public void replay(String str) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        };
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        }), IllegalArgumentException.class);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.snapshotRegistry.epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenWriteFails() {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter(2)).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Collections.singletonList(0L), contextOrThrow.snapshotRegistry.epochsList());
        build.scheduleWriteOperation("write#1", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        Assertions.assertEquals(2L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.snapshotRegistry.epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        TestUtils.assertFutureThrows(build.scheduleWriteOperation("write#2", TP, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4", "record5"), "response2");
        }), KafkaException.class);
        Assertions.assertEquals(2L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(Arrays.asList(0L, 2L), contextOrThrow.snapshotRegistry.epochsList());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
    }

    @Test
    public void testScheduleReadOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), "response2");
        });
        mockPartitionWriter.commit(TP, 2L);
        Assertions.assertTrue(scheduleWriteOperation.isDone());
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(4L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(2L, contextOrThrow.lastCommittedOffset);
        CompletableFuture scheduleReadOperation = build.scheduleReadOperation("read", TP, (mockCoordinatorShard3, j) -> {
            Assertions.assertEquals(contextOrThrow.lastCommittedOffset, j);
            return "read-response";
        });
        Assertions.assertTrue(scheduleReadOperation.isDone());
        Assertions.assertEquals("read-response", scheduleReadOperation.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleReadOpWhenPartitionInactive() {
        MockTimer mockTimer = new MockTimer();
        TestUtils.assertFutureThrows(new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build().scheduleReadOperation("read", TP, (mockCoordinatorShard, j) -> {
            return "read-response";
        }), NotCoordinatorException.class);
    }

    @Test
    public void testScheduleReadOpWhenOpsFails() {
        MockTimer mockTimer = new MockTimer();
        MockPartitionWriter mockPartitionWriter = new MockPartitionWriter();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        build.scheduleWriteOperation("write#1", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        build.scheduleWriteOperation("write#2", TP, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), "response2");
        });
        mockPartitionWriter.commit(TP, 2L);
        TestUtils.assertFutureThrows(build.scheduleReadOperation("read", TP, (mockCoordinatorShard3, j) -> {
            Assertions.assertEquals(contextOrThrow.lastCommittedOffset, j);
            throw new IllegalArgumentException("error");
        }), IllegalArgumentException.class);
    }

    @Test
    public void testClose() throws Exception {
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.spy(new MockCoordinatorLoader());
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        CompletableFuture scheduleWriteOperation = build.scheduleWriteOperation("write#1", TP, mockCoordinatorShard -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), "response1");
        });
        CompletableFuture scheduleWriteOperation2 = build.scheduleWriteOperation("write#2", TP, mockCoordinatorShard2 -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), "response2");
        });
        Assertions.assertFalse(scheduleWriteOperation.isDone());
        Assertions.assertFalse(scheduleWriteOperation2.isDone());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.SECONDS, true, () -> {
            return new CoordinatorResult(Arrays.asList("record5", "record6"), (CompletableFuture) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        build.close();
        TestUtils.assertFutureThrows(scheduleWriteOperation, NotCoordinatorException.class);
        TestUtils.assertFutureThrows(scheduleWriteOperation2, NotCoordinatorException.class);
        ((MockCoordinatorLoader) Mockito.verify(mockCoordinatorLoader)).close();
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testOnNewMetadataImage() {
        TopicPartition topicPartition = new TopicPartition("__consumer_offsets", 0);
        TopicPartition topicPartition2 = new TopicPartition("__consumer_offsets", 1);
        MockTimer mockTimer = new MockTimer();
        MockCoordinatorLoader mockCoordinatorLoader = (MockCoordinatorLoader) Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter mockPartitionWriter = (MockPartitionWriter) Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier mockCoordinatorShardBuilderSupplier = (MockCoordinatorShardBuilderSupplier) Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder mockCoordinatorShardBuilder = (MockCoordinatorShardBuilder) Mockito.mock(MockCoordinatorShardBuilder.class);
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(mockCoordinatorLoader).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(mockPartitionWriter).withCoordinatorShardBuilderSupplier(mockCoordinatorShardBuilderSupplier).build();
        MockCoordinatorShard mockCoordinatorShard = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        MockCoordinatorShard mockCoordinatorShard2 = (MockCoordinatorShard) Mockito.mock(MockCoordinatorShard.class);
        Mockito.when(mockCoordinatorShardBuilderSupplier.get()).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withSnapshotRegistry((SnapshotRegistry) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withLogContext((LogContext) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTimer((CoordinatorTimer) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTopicPartition((TopicPartition) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.withTime((Time) ArgumentMatchers.any())).thenReturn(mockCoordinatorShardBuilder);
        Mockito.when(mockCoordinatorShardBuilder.m45build()).thenReturn(mockCoordinatorShard).thenReturn(mockCoordinatorShard2);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(topicPartition, mockCoordinatorShard)).thenReturn(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(mockCoordinatorLoader.load(topicPartition2, mockCoordinatorShard2)).thenReturn(completableFuture2);
        build.scheduleLoadOperation(topicPartition, 0);
        build.scheduleLoadOperation(topicPartition2, 0);
        completableFuture.complete(null);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard)).onLoaded(MetadataImage.EMPTY);
        MetadataDelta metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
        MetadataImage apply = metadataDelta.apply(MetadataProvenance.EMPTY);
        build.onNewMetadataImage(apply, metadataDelta);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard)).onNewMetadataImage(apply, metadataDelta);
        completableFuture2.complete(null);
        ((MockCoordinatorShard) Mockito.verify(mockCoordinatorShard2)).onLoaded(apply);
    }

    @Test
    public void testScheduleTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0L, contextOrThrow.lastWrittenOffset);
        Assertions.assertEquals(0L, contextOrThrow.lastCommittedOffset);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Arrays.asList("record1", "record2"), (CompletableFuture) null);
        });
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-2", 20L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Arrays.asList("record3", "record4"), (CompletableFuture) null);
        });
        Assertions.assertEquals(2, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(Utils.mkSet(new String[]{"record1", "record2", "record3", "record4"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testRescheduleTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        Assertions.assertEquals(0, manualEventProcessor.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record1"), (CompletableFuture) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record2"), (CompletableFuture) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record3"), (CompletableFuture) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(2, manualEventProcessor.size());
        Assertions.assertTrue(manualEventProcessor.poll());
        Assertions.assertTrue(manualEventProcessor.poll());
        Assertions.assertEquals(Utils.mkSet(new String[]{"record3"}), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testCancelTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        ManualEventProcessor manualEventProcessor = new ManualEventProcessor();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(manualEventProcessor).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        manualEventProcessor.poll();
        manualEventProcessor.poll();
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        Assertions.assertEquals(0, manualEventProcessor.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record1"), (CompletableFuture) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            return new CoordinatorResult(Collections.singletonList("record2"), (CompletableFuture) null);
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.cancel("timer-1");
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, manualEventProcessor.size());
        Assertions.assertTrue(manualEventProcessor.poll());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard) contextOrThrow.coordinator).records());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testRetryableTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(501L);
        Assertions.assertEquals(3, atomicInteger.get());
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.cancel("timer-1");
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }

    @Test
    public void testNonRetryableTimer() throws InterruptedException {
        MockTimer mockTimer = new MockTimer();
        CoordinatorRuntime build = new CoordinatorRuntime.Builder().withTime(mockTimer.time()).withTimer(mockTimer).withLoader(new MockCoordinatorLoader()).withEventProcessor(new DirectEventProcessor()).withPartitionWriter(new MockPartitionWriter()).withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()).build();
        build.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext contextOrThrow = build.contextOrThrow(TP);
        Assertions.assertEquals(0, contextOrThrow.timer.size());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((MockCoordinatorShard) contextOrThrow.coordinator).timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            atomicInteger.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals(1, contextOrThrow.timer.size());
        mockTimer.advanceClock(11L);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(0, contextOrThrow.timer.size());
    }
}
