/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.runtime;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEvent;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorPlayback;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.runtime.InMemoryPartitionWriter;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.runtime.SnapshottableCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.metadata.MetadataEncryptorFactory;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CoordinatorRuntimeTest {
    private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
    private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5L);

    @Test
    public void testScheduleLoading() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture<Object> future = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)0, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onLoaded(MetadataImage.EMPTY);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).registerListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.eq((Object)ctx.coordinator.snapshotRegistry()));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withLogContext((LogContext)ArgumentMatchers.eq((Object)ctx.logContext));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withTime((Time)ArgumentMatchers.eq((Object)timer.time()));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.eq((Object)ctx.timer)));
    }

    @Test
    public void testScheduleLoadingWithFailure() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)0, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.FAILED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).deregisterListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
    }

    @Test
    public void testScheduleLoadingWithStalePartitionEpoch() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture<Object> future = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleLoadOperation(TP, 0);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
    }

    @Test
    public void testScheduleLoadingAfterLoadingFailure() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.FAILED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 11);
        ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)11, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
    }

    @Test
    public void testScheduleUnloading() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.CLOSED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).deregisterListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWithEmptyEpoch() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.empty());
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.CLOSED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).deregisterListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWhenContextDoesntExist() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(11));
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)0))).onUnloaded();
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWithStalePartitionEpoch() {
        MockTimer timer = new MockTimer();
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(0));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
    }

    @Test
    public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value("record1"), InMemoryPartitionWriter.LogEntry.value("record2")), writer.entries(TP));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record3"), (Object)"response2"));
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2", "record3"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value("record1"), InMemoryPartitionWriter.LogEntry.value("record2"), InMemoryPartitionWriter.LogEntry.value("record3")), writer.entries(TP));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Collections.emptyList(), (Object)"response3"));
        Assertions.assertFalse((boolean)write3.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2", "record3"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value("record1"), InMemoryPartitionWriter.LogEntry.value("record2"), InMemoryPartitionWriter.LogEntry.value("record3")), writer.entries(TP));
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(2L, 3L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        writer.commit(TP, 3L);
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertTrue((boolean)write3.isDone());
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response3", write3.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(3L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Collections.emptyList(), (Object)"response4"));
        Assertions.assertTrue((boolean)write4.isDone());
        Assertions.assertEquals((Object)"response4", write4.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(Collections.singletonList(3L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenInactive() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Collections.emptyList(), (Object)"response1"));
        TestUtils.assertFutureThrows((Future)write, NotCoordinatorException.class);
    }

    @Test
    public void testScheduleWriteOpWhenOpFails() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> {
            throw new KafkaException("error");
        });
        TestUtils.assertFutureThrows((Future)write, KafkaException.class);
    }

    @Test
    public void testScheduleWriteOpWhenReplayFails() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
        ctx.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, (CoordinatorShard)new MockCoordinatorShard(snapshotRegistry, (CoordinatorTimer)ctx.timer){

            @Override
            public void replay(long offset, long producerId, short producerEpoch, String record) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        }, TP);
        CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        TestUtils.assertFutureThrows((Future)write, IllegalArgumentException.class);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenWriteFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter(2);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record3", "record4", "record5"), (Object)"response2"));
        TestUtils.assertFutureThrows((Future)write2, KafkaException.class);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
    }

    @Test
    public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3L), state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        timer.advanceClock(4L);
        TestUtils.assertFutureThrows((Future)timedOutWrite, org.apache.kafka.common.errors.TimeoutException.class);
    }

    @Test
    public void testScheduleTransactionalWriteOp() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        final MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        final MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder(){

            @Override
            public MockCoordinatorShard build() {
                return coordinator;
            }
        };
        MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier(){

            @Override
            public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
                return shardBuilder;
            }
        };
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)shardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).registerListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any());
        VerificationGuard guard = new VerificationGuard();
        Mockito.when(writer.maybeStartTransactionVerification(TP, "transactional-id", 100L, (short)50)).thenReturn(CompletableFuture.completedFuture(guard));
        runtime.scheduleTransactionalWriteOperation("tnx-write", TP, "transactional-id", 100L, (short)50, Duration.ofMillis(5000L), state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response"));
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).append((TopicPartition)ArgumentMatchers.eq((Object)TP), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((short)50), (VerificationGuard)ArgumentMatchers.eq((Object)guard), (List)ArgumentMatchers.eq(Arrays.asList("record1", "record2")));
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).replay(ArgumentMatchers.eq((long)0L), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((short)50), (String)ArgumentMatchers.eq((Object)"record1"));
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).replay(ArgumentMatchers.eq((long)1L), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((short)50), (String)ArgumentMatchers.eq((Object)"record2"));
    }

    @Test
    public void testScheduleTransactionalWriteOpWhenVerificationFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        final MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        final MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder(){

            @Override
            public MockCoordinatorShard build() {
                return coordinator;
            }
        };
        MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier(){

            @Override
            public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
                return shardBuilder;
            }
        };
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)shardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).registerListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any());
        Mockito.when(writer.maybeStartTransactionVerification(TP, "transactional-id", 100L, (short)50)).thenReturn((Object)FutureUtils.failedFuture((Throwable)Errors.NOT_ENOUGH_REPLICAS.exception()));
        CompletableFuture future = runtime.scheduleTransactionalWriteOperation("tnx-write", TP, "transactional-id", 100L, (short)50, Duration.ofMillis(5000L), state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response"));
        TestUtils.assertFutureThrows((Future)future, NotEnoughReplicasException.class);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)0))).append((TopicPartition)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyShort(), (VerificationGuard)ArgumentMatchers.any(), (List)ArgumentMatchers.any());
    }

    @ParameterizedTest
    @EnumSource(value=TransactionResult.class)
    public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short)5, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record1"), InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record2")), writer.entries(TP));
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, result, DEFAULT_WRITE_TIMEOUT);
        Assertions.assertFalse((boolean)complete1.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L, 3L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        if (result == TransactionResult.COMMIT) {
            Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        } else {
            Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        }
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record1"), InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record2"), InMemoryPartitionWriter.LogEntry.control(100L, (short)5, 10, result)), writer.entries(TP));
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        writer.commit(TP, 3L);
        Assertions.assertTrue((boolean)complete1.isDone());
        Assertions.assertNull(complete1.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleTransactionCompletionWhenWriteTimesOut() throws InterruptedException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture timedOutCompletion = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, TransactionResult.COMMIT, Duration.ofMillis(3L));
        Assertions.assertEquals((long)1L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        timer.advanceClock(4L);
        TestUtils.assertFutureThrows((Future)timedOutCompletion, org.apache.kafka.common.errors.TimeoutException.class);
        Assertions.assertEquals((long)1L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 1L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleTransactionCompletionWhenWriteFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter(true);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        runtime.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short)5, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        TestUtils.assertFutureThrows((Future)complete1, KafkaException.class);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
    }

    @Test
    public void testScheduleTransactionCompletionWhenReplayFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Collections.singletonList(0L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
        ctx.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, (CoordinatorShard)new MockCoordinatorShard(snapshotRegistry, (CoordinatorTimer)ctx.timer){

            @Override
            public void replayEndTransactionMarker(long producerId, short producerEpoch, TransactionResult result) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        }, TP);
        runtime.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short)5, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record1"), InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record2")), writer.entries(TP));
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        TestUtils.assertFutureThrows((Future)complete1, IllegalArgumentException.class);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(Arrays.asList(0L, 2L), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(Arrays.asList(InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record1"), InMemoryPartitionWriter.LogEntry.value(100L, (short)5, "record2")), writer.entries(TP));
    }

    @Test
    public void testScheduleReadOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record3", "record4"), (Object)"response2"));
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastCommittedOffset());
        CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> {
            Assertions.assertEquals((long)ctx.coordinator.lastCommittedOffset(), (long)offset);
            return "read-response";
        });
        Assertions.assertTrue((boolean)read.isDone());
        Assertions.assertEquals((Object)"read-response", read.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleReadOpWhenPartitionInactive() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> "read-response");
        TestUtils.assertFutureThrows((Future)read, NotCoordinatorException.class);
    }

    @Test
    public void testScheduleReadOpWhenOpsFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record3", "record4"), (Object)"response2"));
        writer.commit(TP, 2L);
        CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> {
            Assertions.assertEquals((long)ctx.coordinator.lastCommittedOffset(), (long)offset);
            throw new IllegalArgumentException("error");
        });
        TestUtils.assertFutureThrows((Future)read, IllegalArgumentException.class);
    }

    @Test
    public void testClose() throws Exception {
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.spy((Object)new MockCoordinatorLoader());
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record1", "record2"), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Arrays.asList("record3", "record4"), (Object)"response2"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.SECONDS, true, () -> new CoordinatorResult(Arrays.asList("record5", "record6"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        runtime.close();
        TestUtils.assertFutureThrows((Future)write1, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write2, NotCoordinatorException.class);
        ((MockCoordinatorLoader)Mockito.verify((Object)loader)).close();
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testOnNewMetadataImage() {
        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        MockCoordinatorShard coordinator0 = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        MockCoordinatorShard coordinator1 = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator0).thenReturn((Object)coordinator1);
        CompletableFuture<Object> future0 = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)tp0), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, tp0))))).thenReturn(future0);
        CompletableFuture<Object> future1 = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)tp1), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, tp1))))).thenReturn(future1);
        runtime.scheduleLoadOperation(tp0, 0);
        runtime.scheduleLoadOperation(tp1, 0);
        Assertions.assertEquals((Object)coordinator0, (Object)runtime.contextOrThrow((TopicPartition)tp0).coordinator.coordinator());
        Assertions.assertEquals((Object)coordinator1, (Object)runtime.contextOrThrow((TopicPartition)tp1).coordinator.coordinator());
        future0.complete(null);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator0)).onLoaded(MetadataImage.EMPTY);
        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY, __ -> null, new MetadataEncryptorFactory());
        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
        runtime.onNewMetadataImage(newImage, delta);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator0)).onNewMetadataImage(newImage, delta);
        future1.complete(null);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator1)).onLoaded(newImage);
    }

    @Test
    public void testScheduleTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(30L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Arrays.asList("record1", "record2"), null));
        ctx.timer.schedule("timer-2", 20L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Arrays.asList("record3", "record4"), null));
        Assertions.assertEquals((int)2, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record1", "record2", "record3", "record4"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testRescheduleTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        Assertions.assertEquals((int)0, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Collections.singletonList("record1"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Collections.singletonList("record2"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Collections.singletonList("record3"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)2, (int)processor.size());
        Assertions.assertTrue((boolean)processor.poll());
        Assertions.assertTrue((boolean)processor.poll());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"record3"}), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testCancelTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        Assertions.assertEquals((int)0, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Collections.singletonList("record1"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(Collections.singletonList("record2"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.cancel("timer-1");
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertTrue((boolean)processor.poll());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testRetryableTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)2, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)3, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.cancel("timer-1");
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testRetryableTimerWithCustomBackoff() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, 1000L, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)2, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)2, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)3, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.cancel("timer-1");
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testNonRetryableTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testStateChanges() throws Exception {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        GroupCoordinatorRuntimeMetrics runtimeMetrics = (GroupCoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)loader).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING);
        future.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.FAILED, (Object)ctx.state);
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.FAILED);
        TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
        future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)tp), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, tp))))).thenReturn(future);
        runtime.scheduleLoadOperation(tp, 0);
        ctx = runtime.contextOrThrow(tp);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)2))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING);
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.ACTIVE);
        runtime.close();
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.FAILED, CoordinatorRuntime.CoordinatorState.CLOSED);
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.ACTIVE, CoordinatorRuntime.CoordinatorState.CLOSED);
    }

    @Test
    public void testPartitionLoadSensor() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        GroupCoordinatorRuntimeMetrics runtimeMetrics = (GroupCoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class);
        long startTimeMs = timer.time().milliseconds();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(startTimeMs, startTimeMs + 1000L, startTimeMs + 500L, 30L, 3000L), Collections.emptyList(), Collections.emptyList())).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        ((GroupCoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000L);
    }

    @Test
    public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        GroupCoordinatorRuntimeMetrics runtimeMetrics = (GroupCoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime(Time.SYSTEM).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(1000L, 2000L, 1500L, 30L, 3000L), Arrays.asList(5L, 15L, 27L), Arrays.asList(5L, 15L))).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((long)27L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)15L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertFalse((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
        Assertions.assertFalse((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(5L));
        Assertions.assertTrue((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(15L));
        Assertions.assertTrue((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(27L));
    }

    @Test
    public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        GroupCoordinatorRuntimeMetrics runtimeMetrics = (GroupCoordinatorRuntimeMetrics)Mockito.mock(GroupCoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime(Time.SYSTEM).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(1000L, 2000L, 1500L, 30L, 3000L), Collections.emptyList(), Collections.emptyList())).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
    }

    private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(CoordinatorRuntime<S, U> runtime, TopicPartition tp) {
        return c -> c.equals(runtime.contextOrThrow((TopicPartition)tp).coordinator);
    }

    private static class MockCoordinatorShardBuilderSupplier
    implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
        private MockCoordinatorShardBuilderSupplier() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
            return new MockCoordinatorShardBuilder();
        }
    }

    private static class MockCoordinatorShardBuilder
    implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
        private SnapshotRegistry snapshotRegistry;
        private CoordinatorTimer<Void, String> timer;

        private MockCoordinatorShardBuilder() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(LogContext logContext) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(Time time) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(CoordinatorTimer<Void, String> timer) {
            this.timer = timer;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(TopicPartition topicPartition) {
            return this;
        }

        public MockCoordinatorShard build() {
            return new MockCoordinatorShard(Objects.requireNonNull(this.snapshotRegistry), Objects.requireNonNull(this.timer));
        }
    }

    static class MockCoordinatorShard
    implements CoordinatorShard<String> {
        private final SnapshotRegistry snapshotRegistry;
        private final TimelineHashSet<String> records;
        private final TimelineHashMap<Long, TimelineHashSet<String>> pendingRecords;
        private final CoordinatorTimer<Void, String> timer;

        MockCoordinatorShard(SnapshotRegistry snapshotRegistry, CoordinatorTimer<Void, String> timer) {
            this.snapshotRegistry = snapshotRegistry;
            this.records = new TimelineHashSet(snapshotRegistry, 0);
            this.pendingRecords = new TimelineHashMap(snapshotRegistry, 0);
            this.timer = timer;
        }

        public void replay(long offset, long producerId, short producerEpoch, String record) throws RuntimeException {
            if (producerId == -1L) {
                this.records.add((Object)record);
            } else {
                ((TimelineHashSet)this.pendingRecords.computeIfAbsent((Object)producerId, __ -> new TimelineHashSet(this.snapshotRegistry, 0))).add((Object)record);
            }
        }

        public void replayEndTransactionMarker(long producerId, short producerEpoch, TransactionResult result) throws RuntimeException {
            if (result == TransactionResult.COMMIT) {
                TimelineHashSet pending = (TimelineHashSet)this.pendingRecords.remove((Object)producerId);
                if (pending == null) {
                    return;
                }
                this.records.addAll((Collection)pending);
            } else {
                this.pendingRecords.remove((Object)producerId);
            }
        }

        Set<String> pendingRecords(long producerId) {
            TimelineHashSet pending = (TimelineHashSet)this.pendingRecords.get((Object)producerId);
            if (pending == null) {
                return Collections.emptySet();
            }
            return Collections.unmodifiableSet(new HashSet(pending));
        }

        Set<String> records() {
            return Collections.unmodifiableSet(new HashSet<String>((Collection<String>)this.records));
        }

        CoordinatorTimer<Void, String> timer() {
            return this.timer;
        }
    }

    private static class MockPartitionWriter
    extends InMemoryPartitionWriter<String> {
        private final int maxRecordsInBatch;
        private final boolean failEndMarker;

        public MockPartitionWriter() {
            this(Integer.MAX_VALUE, false);
        }

        public MockPartitionWriter(int maxRecordsInBatch) {
            this(maxRecordsInBatch, false);
        }

        public MockPartitionWriter(boolean failEndMarker) {
            this(Integer.MAX_VALUE, failEndMarker);
        }

        public MockPartitionWriter(int maxRecordsInBatch, boolean failEndMarker) {
            super(false);
            this.maxRecordsInBatch = maxRecordsInBatch;
            this.failEndMarker = failEndMarker;
        }

        @Override
        public void registerListener(TopicPartition tp, PartitionWriter.Listener listener) {
            super.registerListener(tp, listener);
        }

        @Override
        public void deregisterListener(TopicPartition tp, PartitionWriter.Listener listener) {
            super.deregisterListener(tp, listener);
        }

        @Override
        public long append(TopicPartition tp, long producerId, short producerEpoch, VerificationGuard verificationGuard, List<String> records) throws KafkaException {
            if (records.size() <= this.maxRecordsInBatch) {
                return super.append(tp, producerId, producerEpoch, verificationGuard, records);
            }
            throw new KafkaException(String.format("Number of records %d greater than the maximum allowed %d.", records.size(), this.maxRecordsInBatch));
        }

        @Override
        public long appendEndTransactionMarker(TopicPartition tp, long producerId, short producerEpoch, int coordinatorEpoch, TransactionResult result) throws KafkaException {
            if (this.failEndMarker) {
                throw new KafkaException("Can't write end marker.");
            }
            return super.appendEndTransactionMarker(tp, producerId, producerEpoch, coordinatorEpoch, result);
        }
    }

    private static class MockCoordinatorLoader
    implements CoordinatorLoader<String> {
        private final CoordinatorLoader.LoadSummary summary;
        private final List<Long> lastWrittenOffsets;
        private final List<Long> lastCommittedOffsets;

        public MockCoordinatorLoader(CoordinatorLoader.LoadSummary summary, List<Long> lastWrittenOffsets, List<Long> lastCommittedOffsets) {
            this.summary = summary;
            this.lastWrittenOffsets = lastWrittenOffsets;
            this.lastCommittedOffsets = lastCommittedOffsets;
        }

        public MockCoordinatorLoader() {
            this(null, Collections.emptyList(), Collections.emptyList());
        }

        public CompletableFuture<CoordinatorLoader.LoadSummary> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
            this.lastWrittenOffsets.forEach(arg_0 -> replayable.updateLastWrittenOffset(arg_0));
            this.lastCommittedOffsets.forEach(arg_0 -> replayable.updateLastCommittedOffset(arg_0));
            return CompletableFuture.completedFuture(this.summary);
        }

        public void close() throws Exception {
        }
    }

    private static class ManualEventProcessor
    implements CoordinatorEventProcessor {
        private Queue<CoordinatorEvent> queue = new LinkedList<CoordinatorEvent>();

        private ManualEventProcessor() {
        }

        public void enqueue(CoordinatorEvent event) throws RejectedExecutionException {
            this.queue.add(event);
        }

        public boolean poll() {
            CoordinatorEvent event = this.queue.poll();
            if (event == null) {
                return false;
            }
            try {
                event.run();
            }
            catch (Throwable ex) {
                event.complete(ex);
            }
            return true;
        }

        public int size() {
            return this.queue.size();
        }

        public void close() throws Exception {
        }
    }

    private static class DirectEventProcessor
    implements CoordinatorEventProcessor {
        private DirectEventProcessor() {
        }

        public void enqueue(CoordinatorEvent event) throws RejectedExecutionException {
            try {
                event.run();
            }
            catch (Throwable ex) {
                event.complete(ex);
            }
        }

        public void close() throws Exception {
        }
    }
}

