/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.common.runtime;

import java.lang.invoke.CallSite;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEvent;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorPlayback;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.common.runtime.InMemoryPartitionWriter;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.common.runtime.Serializer;
import org.apache.kafka.coordinator.common.runtime.SnapshottableCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CoordinatorRuntimeTest {
    private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
    private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5L);
    private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = ApiKeys.TXN_OFFSET_COMMIT.latestVersion();

    private static MemoryRecords records(long timestamp, String ... records) {
        return CoordinatorRuntimeTest.records(timestamp, Arrays.stream(records).collect(Collectors.toList()));
    }

    private static MemoryRecords records(long timestamp, List<String> records) {
        if (records.isEmpty()) {
            return MemoryRecords.EMPTY;
        }
        List<SimpleRecord> simpleRecords = records.stream().map(record -> new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset()))).collect(Collectors.toList());
        int sizeEstimate = AbstractRecords.estimateSizeInBytes((byte)RecordVersion.current().value, (CompressionType)CompressionType.NONE, simpleRecords);
        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)RecordVersion.current().value, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)timestamp, (long)-1L, (short)-1, (int)0, (boolean)false, (int)-1);
        simpleRecords.forEach(arg_0 -> ((MemoryRecordsBuilder)builder).append(arg_0));
        return builder.build();
    }

    private static MemoryRecords transactionalRecords(long producerId, short producerEpoch, long timestamp, String ... records) {
        return CoordinatorRuntimeTest.transactionalRecords(producerId, producerEpoch, timestamp, Arrays.stream(records).collect(Collectors.toList()));
    }

    private static MemoryRecords transactionalRecords(long producerId, short producerEpoch, long timestamp, List<String> records) {
        if (records.isEmpty()) {
            return MemoryRecords.EMPTY;
        }
        List<SimpleRecord> simpleRecords = records.stream().map(record -> new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset()))).collect(Collectors.toList());
        int sizeEstimate = AbstractRecords.estimateSizeInBytes((byte)RecordVersion.current().value, (CompressionType)CompressionType.NONE, simpleRecords);
        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)RecordVersion.current().value, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)timestamp, (long)producerId, (short)producerEpoch, (int)0, (boolean)true, (int)-1);
        simpleRecords.forEach(arg_0 -> ((MemoryRecordsBuilder)builder).append(arg_0));
        return builder.build();
    }

    private static MemoryRecords endTransactionMarker(long producerId, short producerEpoch, long timestamp, int coordinatorEpoch, ControlRecordType result) {
        return MemoryRecords.withEndTransactionMarker((long)timestamp, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(result, coordinatorEpoch));
    }

    @Test
    public void testScheduleLoading() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture<Object> future = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)0, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onLoaded(MetadataImage.EMPTY);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).registerListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.eq((Object)ctx.coordinator.snapshotRegistry()));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withLogContext((LogContext)ArgumentMatchers.eq((Object)ctx.logContext));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withTime((Time)ArgumentMatchers.eq((Object)timer.time()));
        ((MockCoordinatorShardBuilder)Mockito.verify((Object)builder, (VerificationMode)Mockito.times((int)1))).withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.eq((Object)ctx.timer)));
    }

    @Test
    public void testScheduleLoadingWithFailure() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)0, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.FAILED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
    }

    @Test
    public void testScheduleLoadingWithStalePartitionEpoch() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture<Object> future = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleLoadOperation(TP, 0);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
    }

    @Test
    public void testScheduleLoadingAfterLoadingFailure() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.FAILED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 11);
        ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        Assertions.assertEquals((int)11, (int)ctx.epoch);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
    }

    @Test
    public void testScheduleUnloading() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.CLOSED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).deregisterListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWithEmptyEpoch() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.empty());
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.CLOSED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).deregisterListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any(PartitionWriter.Listener.class));
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWhenContextDoesntExist() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(11));
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)0))).onUnloaded();
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWithStalePartitionEpoch() {
        MockTimer timer = new MockTimer();
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(0));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
    }

    @Test
    public void testScheduleUnloadingWithException() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntimeMetrics metrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics(metrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        ((MockCoordinatorShard)Mockito.doThrow((Throwable[])new Throwable[]{new KafkaException("error")}).when((Object)coordinator)).onUnloaded();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((int)10, (int)ctx.epoch);
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.CLOSED, (Object)ctx.state);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntimeMetrics metrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        ((CoordinatorRuntimeMetrics)Mockito.doThrow((Throwable[])new Throwable[]{new KafkaException("error")}).when((Object)metrics)).recordEventPurgatoryTime(ArgumentMatchers.anyLong());
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics(metrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3')).map(c -> {
            char[] payload = new char[maxBatchSize * 3 / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of((String)records.get(0)), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of((String)records.get(1)), (Object)"response2"));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of((String)records.get(1)), (Object)"response3"));
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), (String)records.get(0)), CoordinatorRuntimeTest.records(timer.time().milliseconds(), (String)records.get(1))), writer.entries(TP));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertFalse((boolean)write3.isDone());
        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.CLOSED, (Object)ctx.state);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertTrue((boolean)write3.isDone());
        TestUtils.assertFutureThrows((Future)write1, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write2, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write3, NotCoordinatorException.class);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).onUnloaded();
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
    }

    @Test
    public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record1", "record2")), writer.entries(TP));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record3"), (Object)"response2"));
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2", "record3"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record1", "record2"), CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record3")), writer.entries(TP));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Collections.emptyList(), (Object)"response3"));
        Assertions.assertFalse((boolean)write3.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2", "record3"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record1", "record2"), CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record3")), writer.entries(TP));
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(2L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        writer.commit(TP, 3L);
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertTrue((boolean)write3.isDone());
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response3", write3.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Collections.emptyList(), (Object)"response4"));
        Assertions.assertTrue((boolean)write4.isDone());
        Assertions.assertEquals((Object)"response4", write4.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(List.of(Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenInactive() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(Collections.emptyList(), (Object)"response1"));
        TestUtils.assertFutureThrows((Future)write, NotCoordinatorException.class);
    }

    @Test
    public void testScheduleWriteOpWhenOpFails() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> {
            throw new KafkaException("error");
        });
        TestUtils.assertFutureThrows((Future)write, KafkaException.class);
    }

    @Test
    public void testScheduleWriteOpWhenReplayFails() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
        ctx.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, (CoordinatorShard)new MockCoordinatorShard(snapshotRegistry, (CoordinatorTimer)ctx.timer){

            @Override
            public void replay(long offset, long producerId, short producerEpoch, String record) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        }, TP);
        CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        TestUtils.assertFutureThrows((Future)write, IllegalArgumentException.class);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleWriteOpWhenWriteFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter(1);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record3", "record4", "record5"), (Object)"response2"));
        TestUtils.assertFutureThrows((Future)write2, KafkaException.class);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
    }

    @Test
    public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3L), state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        timer.advanceClock(4L);
        TestUtils.assertFutureThrows((Future)timedOutWrite, org.apache.kafka.common.errors.TimeoutException.class);
    }

    @Test
    public void testScheduleWriteAllOperation() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        TopicPartition coordinator0 = new TopicPartition("__consumer_offsets", 0);
        TopicPartition coordinator1 = new TopicPartition("__consumer_offsets", 1);
        TopicPartition coordinator2 = new TopicPartition("__consumer_offsets", 2);
        runtime.scheduleLoadOperation(coordinator0, 10);
        runtime.scheduleLoadOperation(coordinator1, 10);
        runtime.scheduleLoadOperation(coordinator2, 10);
        AtomicInteger cnt = new AtomicInteger(0);
        List writes = runtime.scheduleWriteAllOperation("write", DEFAULT_WRITE_TIMEOUT, state -> {
            int counter = cnt.getAndIncrement();
            return new CoordinatorResult(List.of("record#" + counter), List.of("response#" + counter));
        });
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)coordinator0).coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)coordinator1).coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)coordinator2).coordinator.lastWrittenOffset());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#0")), writer.entries(coordinator0));
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#1")), writer.entries(coordinator1));
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#2")), writer.entries(coordinator2));
        writer.commit(coordinator0);
        writer.commit(coordinator1);
        writer.commit(coordinator2);
        Assertions.assertEquals(List.of("response#0", "response#1", "response#2"), FutureUtils.combineFutures((List)writes, ArrayList::new, List::addAll).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleTransactionalWriteOp() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        final MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        final MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder(){

            @Override
            public MockCoordinatorShard build() {
                return coordinator;
            }
        };
        MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier(){

            @Override
            public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
                return shardBuilder;
            }
        };
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)shardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).registerListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any());
        Mockito.when((Object)writer.config(TP)).thenReturn((Object)new LogConfig(Collections.emptyMap()));
        VerificationGuard guard = new VerificationGuard();
        Mockito.when(writer.maybeStartTransactionVerification(TP, "transactional-id", 100L, (short)50, TXN_OFFSET_COMMIT_LATEST_VERSION)).thenReturn(CompletableFuture.completedFuture(guard));
        runtime.scheduleTransactionalWriteOperation("tnx-write", TP, "transactional-id", 100L, (short)50, Duration.ofMillis(5000L), state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response"), Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).append((TopicPartition)ArgumentMatchers.eq((Object)TP), (VerificationGuard)ArgumentMatchers.eq((Object)guard), (MemoryRecords)ArgumentMatchers.eq((Object)CoordinatorRuntimeTest.transactionalRecords(100L, (short)50, timer.time().milliseconds(), "record1", "record2")));
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).replay(ArgumentMatchers.eq((long)0L), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((short)50), (String)ArgumentMatchers.eq((Object)"record1"));
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator, (VerificationMode)Mockito.times((int)1))).replay(ArgumentMatchers.eq((long)1L), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((short)50), (String)ArgumentMatchers.eq((Object)"record2"));
    }

    @Test
    public void testScheduleTransactionalWriteOpWhenVerificationFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        final MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        final MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder(){

            @Override
            public MockCoordinatorShard build() {
                return coordinator;
            }
        };
        MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier(){

            @Override
            public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
                return shardBuilder;
            }
        };
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)shardBuilderSupplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).registerListener((TopicPartition)ArgumentMatchers.eq((Object)TP), (PartitionWriter.Listener)ArgumentMatchers.any());
        Mockito.when(writer.maybeStartTransactionVerification(TP, "transactional-id", 100L, (short)50, TXN_OFFSET_COMMIT_LATEST_VERSION)).thenReturn((Object)FutureUtils.failedFuture((Throwable)Errors.NOT_ENOUGH_REPLICAS.exception()));
        CompletableFuture future = runtime.scheduleTransactionalWriteOperation("tnx-write", TP, "transactional-id", 100L, (short)50, Duration.ofMillis(5000L), state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response"), Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        TestUtils.assertFutureThrows((Future)future, NotEnoughReplicasException.class);
        ((MockPartitionWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)0))).append((TopicPartition)ArgumentMatchers.any(), (VerificationGuard)ArgumentMatchers.any(), (MemoryRecords)ArgumentMatchers.any());
    }

    @ParameterizedTest
    @EnumSource(value=TransactionResult.class)
    public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException {
        ControlRecordType expectedType;
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short)5, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"), Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.transactionalRecords(100L, (short)5, timer.time().milliseconds(), "record1", "record2")), writer.entries(TP));
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, result, DEFAULT_WRITE_TIMEOUT);
        Assertions.assertFalse((boolean)complete1.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        if (result == TransactionResult.COMMIT) {
            Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
            expectedType = ControlRecordType.COMMIT;
        } else {
            Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
            expectedType = ControlRecordType.ABORT;
        }
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.transactionalRecords(100L, (short)5, timer.time().milliseconds(), "record1", "record2"), CoordinatorRuntimeTest.endTransactionMarker(100L, (short)5, timer.time().milliseconds(), 10, expectedType)), writer.entries(TP));
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        writer.commit(TP, 3L);
        Assertions.assertTrue((boolean)complete1.isDone());
        Assertions.assertNull(complete1.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleTransactionCompletionWhenWriteTimesOut() throws InterruptedException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture timedOutCompletion = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, TransactionResult.COMMIT, Duration.ofMillis(3L));
        Assertions.assertEquals((long)1L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(1L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        timer.advanceClock(4L);
        TestUtils.assertFutureThrows((Future)timedOutCompletion, org.apache.kafka.common.errors.TimeoutException.class);
        Assertions.assertEquals((long)1L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(1L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
    }

    @Test
    public void testScheduleTransactionCompletionWhenWriteFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter(true);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        runtime.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short)5, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"), Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        TestUtils.assertFutureThrows((Future)complete1, KafkaException.class);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
    }

    @Test
    public void testScheduleTransactionCompletionWhenReplayFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
        ctx.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, (CoordinatorShard)new MockCoordinatorShard(snapshotRegistry, (CoordinatorTimer)ctx.timer){

            @Override
            public void replayEndTransactionMarker(long producerId, short producerEpoch, TransactionResult result) throws RuntimeException {
                throw new IllegalArgumentException("error");
            }
        }, TP);
        runtime.scheduleTransactionalWriteOperation("write#1", TP, "transactional-id", 100L, (short)5, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"), Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.transactionalRecords(100L, (short)5, timer.time().milliseconds(), "record1", "record2")), writer.entries(TP));
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)5, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        TestUtils.assertFutureThrows((Future)complete1, IllegalArgumentException.class);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.transactionalRecords(100L, (short)5, timer.time().milliseconds(), "record1", "record2")), writer.entries(TP));
    }

    @Test
    public void testScheduleReadOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record3", "record4"), (Object)"response2"));
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastCommittedOffset());
        CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> {
            Assertions.assertEquals((long)ctx.coordinator.lastCommittedOffset(), (long)offset);
            return "read-response";
        });
        Assertions.assertTrue((boolean)read.isDone());
        Assertions.assertEquals((Object)"read-response", read.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleReadOpWhenPartitionInactive() {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> "read-response");
        TestUtils.assertFutureThrows((Future)read, NotCoordinatorException.class);
    }

    @Test
    public void testScheduleReadOpWhenOpsFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record3", "record4"), (Object)"response2"));
        writer.commit(TP, 2L);
        CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> {
            Assertions.assertEquals((long)ctx.coordinator.lastCommittedOffset(), (long)offset);
            throw new IllegalArgumentException("error");
        });
        TestUtils.assertFutureThrows((Future)read, IllegalArgumentException.class);
    }

    @Test
    public void testScheduleReadAllOp() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        TopicPartition coordinator0 = new TopicPartition("__consumer_offsets", 0);
        TopicPartition coordinator1 = new TopicPartition("__consumer_offsets", 1);
        TopicPartition coordinator2 = new TopicPartition("__consumer_offsets", 2);
        runtime.scheduleLoadOperation(coordinator0, 10);
        runtime.scheduleLoadOperation(coordinator1, 10);
        runtime.scheduleLoadOperation(coordinator2, 10);
        runtime.scheduleWriteOperation("write#0", coordinator0, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record0"), (Object)"response0"));
        runtime.scheduleWriteOperation("write#1", coordinator1, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1"), (Object)"response1"));
        runtime.scheduleWriteOperation("write#2", coordinator2, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record2"), (Object)"response2"));
        writer.commit(coordinator0);
        writer.commit(coordinator1);
        writer.commit(coordinator2);
        List responses = runtime.scheduleReadAllOperation("read", (state, offset) -> new ArrayList<String>(state.records()));
        Assertions.assertEquals(List.of("record0", "record1", "record2"), FutureUtils.combineFutures((List)responses, ArrayList::new, List::addAll).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testClose() throws Exception {
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.spy((Object)new MockCoordinatorLoader());
        MockTimer timer = new MockTimer();
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService(executorService).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1", "record2"), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record3", "record4"), (Object)"response2"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.SECONDS, true, () -> new CoordinatorResult(List.of("record5", "record6"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        runtime.close();
        TestUtils.assertFutureThrows((Future)write1, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write2, NotCoordinatorException.class);
        ((MockCoordinatorLoader)Mockito.verify((Object)loader)).close();
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        ((ExecutorService)Mockito.verify((Object)executorService)).shutdown();
    }

    @Test
    public void testOnNewMetadataImage() {
        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withLoader((CoordinatorLoader)loader).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        MockCoordinatorShard coordinator0 = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        MockCoordinatorShard coordinator1 = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator0).thenReturn((Object)coordinator1);
        CompletableFuture<Object> future0 = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)tp0), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, tp0))))).thenReturn(future0);
        CompletableFuture<Object> future1 = new CompletableFuture<Object>();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)tp1), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, tp1))))).thenReturn(future1);
        runtime.scheduleLoadOperation(tp0, 0);
        runtime.scheduleLoadOperation(tp1, 0);
        Assertions.assertEquals((Object)coordinator0, (Object)runtime.contextOrThrow((TopicPartition)tp0).coordinator.coordinator());
        Assertions.assertEquals((Object)coordinator1, (Object)runtime.contextOrThrow((TopicPartition)tp1).coordinator.coordinator());
        future0.complete(null);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator0)).onLoaded(MetadataImage.EMPTY);
        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
        runtime.onNewMetadataImage(newImage, delta);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator0)).onNewMetadataImage(newImage, delta);
        future1.complete(null);
        ((MockCoordinatorShard)Mockito.verify((Object)coordinator1)).onLoaded(newImage);
    }

    @Test
    public void testScheduleTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(30L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record1", "record2"), null));
        ctx.timer.schedule("timer-2", 20L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record3", "record4"), null));
        Assertions.assertEquals((int)2, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals(Set.of("record1", "record2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals(Set.of("record1", "record2", "record3", "record4"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testRescheduleTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        Assertions.assertEquals((int)0, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record1"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record2"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record3"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)2, (int)processor.size());
        Assertions.assertTrue((boolean)processor.poll());
        Assertions.assertTrue((boolean)processor.poll());
        Assertions.assertEquals(Set.of("record3"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testCancelTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        Assertions.assertEquals((int)0, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record1"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)processor.size());
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult(List.of("record2"), null));
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.cancel("timer-1");
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertTrue((boolean)processor.poll());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testRetryableTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)2, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)3, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.cancel("timer-1");
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testRetryableTimerWithCustomBackoff() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, true, 1000L, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)2, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)2, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(501L);
        Assertions.assertEquals((int)3, (int)cnt.get());
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        ctx.timer.cancel("timer-1");
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testNonRetryableTimer() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.schedule("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(11L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testTimerScheduleIfAbsent() throws InterruptedException {
        MockTimer timer = new MockTimer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)new MockPartitionWriter()).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
        AtomicInteger cnt = new AtomicInteger(0);
        ctx.timer.scheduleIfAbsent("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        Assertions.assertEquals((int)1, (int)ctx.timer.size());
        timer.advanceClock(5L);
        ctx.timer.scheduleIfAbsent("timer-1", 10L, TimeUnit.MILLISECONDS, false, () -> {
            cnt.incrementAndGet();
            throw new KafkaException("error");
        });
        timer.advanceClock(6L);
        Assertions.assertEquals((int)1, (int)cnt.get());
        Assertions.assertEquals((int)0, (int)ctx.timer.size());
    }

    @Test
    public void testStateChanges() throws Exception {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.mock(MockCoordinatorLoader.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)loader).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        CompletableFuture future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, TP))))).thenReturn(future);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING);
        future.completeExceptionally(new Exception("failure"));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.FAILED, (Object)ctx.state);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.FAILED);
        TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
        future = new CompletableFuture();
        Mockito.when(loader.load((TopicPartition)ArgumentMatchers.eq((Object)tp), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.argThat(CoordinatorRuntimeTest.coordinatorMatcher(runtime, tp))))).thenReturn(future);
        runtime.scheduleLoadOperation(tp, 0);
        ctx = runtime.contextOrThrow(tp);
        Assertions.assertEquals((Object)coordinator, (Object)ctx.coordinator.coordinator());
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.LOADING, (Object)ctx.state);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)2))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING);
        future.complete(null);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.ACTIVE);
        runtime.close();
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.FAILED, CoordinatorRuntime.CoordinatorState.CLOSED);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.ACTIVE, CoordinatorRuntime.CoordinatorState.CLOSED);
    }

    @Test
    public void testPartitionLoadSensor() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        long startTimeMs = timer.time().milliseconds();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(startTimeMs, startTimeMs + 1000L, startTimeMs + 500L, 30L, 3000L), Collections.emptyList(), Collections.emptyList())).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        Assertions.assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000L);
    }

    @Test
    public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime(Time.SYSTEM).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(1000L, 2000L, 1500L, 30L, 3000L), List.of(Long.valueOf(5L), Long.valueOf(15L), Long.valueOf(27L)), List.of(Long.valueOf(5L), Long.valueOf(15L)))).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((long)27L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)15L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertFalse((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
        Assertions.assertFalse((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(5L));
        Assertions.assertTrue((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(15L));
        Assertions.assertTrue((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(27L));
    }

    @Test
    public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = (MockPartitionWriter)Mockito.mock(MockPartitionWriter.class);
        MockCoordinatorShardBuilderSupplier supplier = (MockCoordinatorShardBuilderSupplier)Mockito.mock(MockCoordinatorShardBuilderSupplier.class);
        MockCoordinatorShardBuilder builder = (MockCoordinatorShardBuilder)Mockito.mock(MockCoordinatorShardBuilder.class);
        MockCoordinatorShard coordinator = (MockCoordinatorShard)Mockito.mock(MockCoordinatorShard.class);
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime(Time.SYSTEM).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader(new CoordinatorLoader.LoadSummary(1000L, 2000L, 1500L, 30L, 3000L), Collections.emptyList(), Collections.emptyList())).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)supplier).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        Mockito.when(builder.withSnapshotRegistry((SnapshotRegistry)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withLogContext((LogContext)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTime((Time)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTimer((CoordinatorTimer<Void, String>)((CoordinatorTimer)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when(builder.withCoordinatorMetrics((CoordinatorMetrics)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withTopicPartition((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)builder);
        Mockito.when(builder.withExecutor((CoordinatorExecutor<String>)((CoordinatorExecutor)ArgumentMatchers.any()))).thenReturn((Object)builder);
        Mockito.when((Object)builder.build()).thenReturn((Object)coordinator);
        Mockito.when(supplier.get()).thenReturn((Object)builder);
        runtime.scheduleLoadOperation(TP, 0);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
    }

    @Test
    public void testHighWatermarkUpdate() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1"), (Object)"response1"));
        processor.poll();
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record2"), (Object)"response2"));
        processor.poll();
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record1"), CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record2")), writer.entries(TP));
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        writer.commit(TP, 1L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        writer.commit(TP, 2L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)2L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        processor.poll();
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((long)2L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
    }

    @Test
    public void testHighWatermarkUpdateWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntimeMetrics metrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        ((CoordinatorRuntimeMetrics)Mockito.doThrow((Throwable[])new Throwable[]{new KafkaException("error")}).when((Object)metrics)).recordEventPurgatoryTime(ArgumentMatchers.anyLong());
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics(metrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3')).map(c -> {
            char[] payload = new char[maxBatchSize * 3 / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of((String)records.get(0)), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of((String)records.get(1)), (Object)"response2"));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of((String)records.get(1)), (Object)"response3"));
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), (String)records.get(0)), CoordinatorRuntimeTest.records(timer.time().milliseconds(), (String)records.get(1))), writer.entries(TP));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertFalse((boolean)write3.isDone());
        writer.commit(TP, 2L);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertFalse((boolean)write3.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record1"), (Object)"response1"));
        processor.poll();
        CompletableFuture write2 = runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(List.of("record2"), (Object)"response2"));
        processor.poll();
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record1"), CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record2")), writer.entries(TP));
        Assertions.assertEquals((int)2, (int)timer.size());
        writer.commit(TP, 1L);
        writer.commit(TP, 2L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)2L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((int)2, (int)timer.size());
        timer.taskQueue().forEach(taskEntry -> Assertions.assertFalse((boolean)taskEntry.cancelled()));
        processor.poll();
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((long)2L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertEquals((int)2, (int)timer.size());
        timer.taskQueue().forEach(taskEntry -> Assertions.assertTrue((boolean)taskEntry.cancelled()));
    }

    @Test
    public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleTransactionCompletion("transactional-write", TP, 100L, (short)50, 1, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        processor.poll();
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.endTransactionMarker(100L, (short)50, timer.time().milliseconds(), 1, ControlRecordType.COMMIT)), writer.entries(TP));
        Assertions.assertEquals((int)1, (int)timer.size());
        writer.commit(TP, 1L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((int)1, (int)timer.size());
        timer.taskQueue().forEach(taskEntry -> Assertions.assertFalse((boolean)taskEntry.cancelled()));
        processor.poll();
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertEquals((int)1, (int)timer.size());
        timer.taskQueue().forEach(taskEntry -> Assertions.assertTrue((boolean)taskEntry.cancelled()));
    }

    @Test
    public void testAppendRecordBatchSize() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        StringSerializer serializer = new StringSerializer();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)serializer).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        int maxBatchSize = writer.config(TP).maxMessageSize();
        Assertions.assertTrue((maxBatchSize > 16384 ? 1 : 0) != 0);
        ArrayList<CallSite> records = new ArrayList<CallSite>();
        for (int i = 0; i < 3000; ++i) {
            records.add((CallSite)((Object)("record-" + i)));
        }
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult(records, (Object)"response1"));
        Assertions.assertFalse((boolean)write1.isCompletedExceptionally());
        int batchSize = writer.entries(TP).get(0).sizeInBytes();
        Assertions.assertTrue((batchSize > 16384 && batchSize < maxBatchSize ? 1 : 0) != 0);
    }

    @Test
    public void testScheduleWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(0, 2), (Object)"response1"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertNotNull((Object)ctx.currentBatch);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(2, 3), (Object)"response2"));
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(3, 4), (Object)"response3"));
        Assertions.assertFalse((boolean)write3.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String)records.get(3))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), records.subList(0, 3))), writer.entries(TP));
        timer.advanceClock(11L);
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(3L), Long.valueOf(4L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String)records.get(3))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds() - 11L, records.subList(0, 3)), CoordinatorRuntimeTest.records(timer.time().milliseconds() - 11L, records.subList(3, 4))), writer.entries(TP));
        writer.commit(TP);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertTrue((boolean)write3.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response3", write3.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records, (Object)"response1"));
        TestUtils.assertFutureThrows((Future)write, RecordTooLargeException.class);
    }

    @Test
    public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter(0);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(0, 1), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(1, 2), (Object)"response2"));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(2, 3), (Object)"response3"));
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(3, 4), (Object)"response4"));
        TestUtils.assertFutureThrows((Future)write1, KafkaException.class);
        TestUtils.assertFutureThrows((Future)write2, KafkaException.class);
        TestUtils.assertFutureThrows((Future)write3, KafkaException.class);
        TestUtils.assertFutureThrows((Future)write4, KafkaException.class);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptyList(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
    }

    @Test
    public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
        ctx.coordinator = new SnapshottableCoordinator(new LogContext(), snapshotRegistry, (CoordinatorShard)new MockCoordinatorShard(snapshotRegistry, (CoordinatorTimer)ctx.timer){

            @Override
            public void replay(long offset, long producerId, short producerEpoch, String record) throws RuntimeException {
                if (offset >= 1L) {
                    throw new IllegalArgumentException("error");
                }
                super.replay(offset, producerId, producerEpoch, record);
            }
        }, TP);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(0, 1), (Object)"response1"));
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(1, 2), (Object)"response2"));
        TestUtils.assertFutureThrows((Future)write1, IllegalArgumentException.class);
        TestUtils.assertFutureThrows((Future)write2, IllegalArgumentException.class);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptyList(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
    }

    @Test
    public void testScheduleTransactionalWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of("record#1"), (Object)"response1"));
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Set.of("record#1"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write2 = runtime.scheduleTransactionalWriteOperation("txn-write#1", TP, "transactional-id", 100L, (short)50, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of("record#2"), (Object)"response2"), Short.valueOf(TXN_OFFSET_COMMIT_LATEST_VERSION));
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(1L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record#2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Set.of("record#1"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#1"), CoordinatorRuntimeTest.transactionalRecords(100L, (short)50, timer.time().milliseconds(), "record#2")), writer.entries(TP));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of("record#3"), (Object)"response3"));
        Assertions.assertFalse((boolean)write3.isDone());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(1L), Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Set.of("record#2"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Set.of("record#1", "record#3"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#1"), CoordinatorRuntimeTest.transactionalRecords(100L, (short)50, timer.time().milliseconds(), "record#2")), writer.entries(TP));
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)50, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        Assertions.assertFalse((boolean)complete1.isDone());
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptySet(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).pendingRecords(100L));
        Assertions.assertEquals(Set.of("record#1", "record#2", "record#3"), ((MockCoordinatorShard)ctx.coordinator.coordinator()).records());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#1"), CoordinatorRuntimeTest.transactionalRecords(100L, (short)50, timer.time().milliseconds(), "record#2"), CoordinatorRuntimeTest.records(timer.time().milliseconds(), "record#3"), CoordinatorRuntimeTest.endTransactionMarker(100L, (short)50, timer.time().milliseconds(), 10, ControlRecordType.COMMIT)), writer.entries(TP));
        writer.commit(TP);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertTrue((boolean)write3.isDone());
        Assertions.assertTrue((boolean)complete1.isDone());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response3", write3.get(5L, TimeUnit.SECONDS));
        Assertions.assertNull(complete1.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testStateMachineIsReloadedWhenOutOfSync() {
        MockTimer timer = new MockTimer();
        MockCoordinatorLoader loader = (MockCoordinatorLoader)Mockito.spy((Object)new MockCoordinatorLoader());
        MockPartitionWriter writer = new MockPartitionWriter(){

            @Override
            public long append(TopicPartition tp, VerificationGuard verificationGuard, MemoryRecords batch) {
                return super.append(tp, verificationGuard, batch) + 1L;
            }
        };
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)loader).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        SnapshottableCoordinator coordinator = ctx.coordinator;
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(0, 1), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(1, 2), (Object)"response2"));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(2, 3), (Object)"response3"));
        CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(3, 4), (Object)"response4"));
        TestUtils.assertFutureThrows((Future)write1, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write2, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write3, NotCoordinatorException.class);
        TestUtils.assertFutureThrows((Future)write4, NotCoordinatorException.class);
        ((MockCoordinatorLoader)Mockito.verify((Object)loader, (VerificationMode)Mockito.times((int)2))).load((TopicPartition)ArgumentMatchers.eq((Object)TP), (CoordinatorPlayback<String>)((CoordinatorPlayback)ArgumentMatchers.any()));
        Assertions.assertEquals((Object)CoordinatorRuntime.CoordinatorState.ACTIVE, (Object)ctx.state);
        Assertions.assertNotEquals((Object)coordinator, (Object)ctx.coordinator);
    }

    @Test
    public void testWriteOpIsNotReleasedWhenStateMachineIsNotCaughtUpAfterLoad() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorLoader<String> loader = new CoordinatorLoader<String>(){

            public CompletableFuture<CoordinatorLoader.LoadSummary> load(TopicPartition tp, CoordinatorPlayback<String> coordinator) {
                coordinator.replay(0L, -1L, (short)-1, (Object)"record#0");
                coordinator.replay(0L, -1L, (short)-1, (Object)"record#1");
                coordinator.updateLastWrittenOffset(Long.valueOf(2L));
                coordinator.updateLastCommittedOffset(Long.valueOf(1L));
                return CompletableFuture.completedFuture(new CoordinatorLoader.LoadSummary(0L, 0L, 0L, 2L, 1L));
            }

            public void close() {
            }
        };
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)loader).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)1L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        CompletableFuture write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(Collections.emptyList(), (Object)"response1"));
        Assertions.assertFalse((boolean)write.isDone());
        ctx.highWatermarklistener.onHighWatermarkUpdated(TP, 2L);
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(2L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals((Object)"response1", write.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records, (Object)"write#1"));
        TestUtils.assertFutureThrows((Future)write1, RecordTooLargeException.class);
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records, (Object)"write#2", null, true, false));
        Assertions.assertFalse((boolean)write2.isDone());
        Assertions.assertNotNull((Object)ctx.currentBatch);
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String)records.get(3))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds(), records.subList(0, 3))), writer.entries(TP));
        writer.commit(TP, 3L);
        Assertions.assertFalse((boolean)write2.isDone());
        timer.advanceClock(11L);
        Assertions.assertNull((Object)ctx.currentBatch);
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(3L), Long.valueOf(4L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String)records.get(3))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds() - 11L, records.subList(0, 3)), CoordinatorRuntimeTest.records(timer.time().milliseconds() - 11L, records.subList(3, 4))), writer.entries(TP));
        writer.commit(TP, 4L);
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertEquals((Object)"write#2", write2.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testScheduleNonAtomicWriteOperationWithRecordTooLarge() throws InterruptedException {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        char[] payload = new char[maxBatchSize];
        Arrays.fill(payload, '4');
        String record = new String(payload);
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records, (Object)"write#1", null, true, false));
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of(record), (Object)"write#2", null, true, false));
        timer.advanceClock(11L);
        TestUtils.assertFutureThrows((Future)write2, RecordTooLargeException.class);
        Assertions.assertFalse((boolean)write1.isDone());
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L), Long.valueOf(3L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(timer.time().milliseconds() - 11L, records.subList(0, 3))), writer.entries(TP));
    }

    @Test
    public void testScheduleNonAtomicWriteOperationWhenWriteFails() {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter(0);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(0, 1), (Object)"response1", null, true, false));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(1, 2), (Object)"response2", null, true, false));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(2, 3), (Object)"response3", null, true, false));
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(records.subList(3, 4), (Object)"response4", null, true, false));
        TestUtils.assertFutureThrows((Future)write1, KafkaException.class);
        TestUtils.assertFutureThrows((Future)write2, KafkaException.class);
        TestUtils.assertFutureThrows((Future)write3, KafkaException.class);
        TestUtils.assertFutureThrows((Future)write4, KafkaException.class);
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(Long.valueOf(0L)), (Object)ctx.coordinator.snapshotRegistry().epochsList());
        Assertions.assertEquals(Collections.emptyList(), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
    }

    @Test
    public void testEmptyBatch() throws Exception {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ThrowingSerializer<String> serializer = new ThrowingSerializer<String>(new StringSerializer());
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics((CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class)).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer(serializer).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertNull((Object)ctx.currentBatch);
        serializer.throwOnNextOperation();
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(List.of("1"), (Object)"response1"));
        TestUtils.assertFutureThrows((Future)write1, BufferOverflowException.class);
        Assertions.assertNotNull((Object)ctx.currentBatch);
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20L), state -> new CoordinatorResult(Collections.emptyList(), (Object)"response2"));
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
        CompletableFuture complete1 = runtime.scheduleTransactionCompletion("complete#1", TP, 100L, (short)50, 10, TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT);
        Assertions.assertFalse((boolean)complete1.isDone());
        writer.commit(TP);
        Assertions.assertNull(complete1.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testRecordFlushTime() throws Exception {
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter((Time)timer.time(), Integer.MAX_VALUE, false);
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(Duration.ofMillis(20L)).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)new DirectEventProcessor()).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withAppendLingerMs(10).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertNull((Object)ctx.currentBatch);
        int maxBatchSize = writer.config(TP).maxMessageSize();
        List records = Stream.of(Character.valueOf('1'), Character.valueOf('2'), Character.valueOf('3'), Character.valueOf('4')).map(c -> {
            char[] payload = new char[maxBatchSize / 4];
            Arrays.fill(payload, c.charValue());
            return new String(payload);
        }).collect(Collectors.toList());
        long firstBatchTimestamp = timer.time().milliseconds();
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50L), state -> new CoordinatorResult(records.subList(0, 2), (Object)"response1"));
        Assertions.assertNotNull((Object)ctx.currentBatch);
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50L), state -> new CoordinatorResult(records.subList(2, 3), (Object)"response2"));
        Assertions.assertEquals(Collections.emptyList(), writer.entries(TP));
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)0))).recordFlushTime(10L);
        long secondBatchTimestamp = timer.time().milliseconds();
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50L), state -> new CoordinatorResult(records.subList(3, 4), (Object)"response3"));
        Assertions.assertEquals((long)3L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String)records.get(3))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(firstBatchTimestamp, records.subList(0, 3))), writer.entries(TP));
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordFlushTime(10L);
        timer.advanceClock(11L);
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, (String)records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1L, (String)records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2L, (String)records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3L, (String)records.get(3))), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(secondBatchTimestamp, records.subList(0, 3)), CoordinatorRuntimeTest.records(secondBatchTimestamp, records.subList(3, 4))), writer.entries(TP));
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)2))).recordFlushTime(10L);
        writer.commit(TP);
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isDone());
        Assertions.assertTrue((boolean)write3.isDone());
        Assertions.assertEquals((long)4L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals((Object)"response1", write1.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response2", write2.get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals((Object)"response3", write3.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testRecordEventPurgatoryTime() throws Exception {
        Duration writeTimeout = Duration.ofMillis(1000L);
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(writeTimeout).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout, state -> new CoordinatorResult(List.of("record1"), (Object)"response1"));
        CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, writeTimeout, state -> new CoordinatorResult(List.of("record2"), (Object)"response2"));
        CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, writeTimeout, state -> {
            throw new KafkaException("write#3 failed.");
        });
        processor.poll();
        processor.poll();
        processor.poll();
        Assertions.assertTrue((boolean)write3.isCompletedExceptionally());
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)0))).recordEventPurgatoryTime(0L);
        long writeTimestamp = timer.time().milliseconds();
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.records(writeTimestamp, "record1"), CoordinatorRuntimeTest.records(writeTimestamp, "record2")), writer.entries(TP));
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        timer.advanceClock(700L);
        writer.commit(TP, 1L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        processor.poll();
        long purgatoryTimeMs = timer.time().milliseconds() - writeTimestamp;
        timer.advanceClock(301L);
        processor.poll();
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)write1.isDone());
        Assertions.assertTrue((boolean)write2.isCompletedExceptionally());
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventPurgatoryTime(purgatoryTimeMs);
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
    }

    @Test
    public void testWriteEventCompletesOnlyOnce() throws Exception {
        Duration writeTimeout = Duration.ofMillis(1000L);
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(writeTimeout).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout, state -> new CoordinatorResult(List.of("record1"), (Object)"response1"));
        processor.poll();
        long writeTimestamp = timer.time().milliseconds();
        Assertions.assertEquals(Collections.singletonList(CoordinatorRuntimeTest.records(writeTimestamp, "record1")), writer.entries(TP));
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        timer.advanceClock(writeTimeout.toMillis() + 1L);
        processor.poll();
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
        Assertions.assertTrue((boolean)write1.isCompletedExceptionally());
        writer.commit(TP, 1L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        processor.poll();
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)write1.isCompletedExceptionally());
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
    }

    @Test
    public void testCompleteTransactionEventCompletesOnlyOnce() throws Exception {
        Duration writeTimeout = Duration.ofMillis(1000L);
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(writeTimeout).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService((ExecutorService)Mockito.mock(ExecutorService.class)).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleTransactionCompletion("transactional-write", TP, 100L, (short)50, 1, TransactionResult.COMMIT, writeTimeout);
        processor.poll();
        Assertions.assertEquals(List.of(CoordinatorRuntimeTest.endTransactionMarker(100L, (short)50, timer.time().milliseconds(), 1, ControlRecordType.COMMIT)), writer.entries(TP));
        Assertions.assertEquals((int)1, (int)timer.size());
        Assertions.assertFalse((boolean)write1.isDone());
        timer.advanceClock(writeTimeout.toMillis() + 1L);
        processor.poll();
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
        Assertions.assertTrue((boolean)write1.isCompletedExceptionally());
        writer.commit(TP, 1L);
        Assertions.assertEquals((int)1, (int)processor.size());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        processor.poll();
        Assertions.assertEquals((long)-1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).highWatermarklistener.lastHighWatermark());
        Assertions.assertEquals((long)1L, (long)runtime.contextOrThrow((TopicPartition)CoordinatorRuntimeTest.TP).coordinator.lastCommittedOffset());
        Assertions.assertTrue((boolean)write1.isCompletedExceptionally());
        ((CoordinatorRuntimeMetrics)Mockito.verify((Object)runtimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
    }

    @Test
    public void testCoordinatorExecutor() {
        Duration writeTimeout = Duration.ofMillis(1000L);
        MockTimer timer = new MockTimer();
        MockPartitionWriter writer = new MockPartitionWriter();
        ManualEventProcessor processor = new ManualEventProcessor();
        CoordinatorRuntimeMetrics runtimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        Mockito.when(executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(args -> {
            Runnable op = (Runnable)args.getArgument(0);
            op.run();
            return CompletableFuture.completedFuture(null);
        });
        CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime((Time)timer.time()).withTimer((Timer)timer).withDefaultWriteTimeOut(writeTimeout).withLoader((CoordinatorLoader)new MockCoordinatorLoader()).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter((PartitionWriter)writer).withCoordinatorShardBuilderSupplier((CoordinatorShardBuilderSupplier)new MockCoordinatorShardBuilderSupplier()).withCoordinatorRuntimeMetrics(runtimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)Mockito.mock(CoordinatorMetrics.class)).withSerializer((Serializer)new StringSerializer()).withExecutorService(executorService).build();
        runtime.scheduleLoadOperation(TP, 10);
        processor.poll();
        processor.poll();
        CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout, state -> {
            state.executor.schedule("write#1#task", () -> "task result", (result, exception) -> {
                Assertions.assertEquals((Object)"task result", (Object)result);
                Assertions.assertNull((Object)exception);
                return new CoordinatorResult(Collections.singletonList("record2"), null);
            });
            return new CoordinatorResult(Collections.singletonList("record1"), (Object)"response1");
        });
        processor.poll();
        Assertions.assertEquals((int)1, (int)processor.queue.size());
        CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
        Assertions.assertEquals((long)1L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, "record1")), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        processor.poll();
        Assertions.assertEquals((int)0, (int)processor.queue.size());
        Assertions.assertEquals((long)2L, (long)ctx.coordinator.lastWrittenOffset());
        Assertions.assertEquals((long)0L, (long)ctx.coordinator.lastCommittedOffset());
        Assertions.assertEquals(List.of(new MockCoordinatorShard.RecordAndMetadata(0L, "record1"), new MockCoordinatorShard.RecordAndMetadata(1L, "record2")), ((MockCoordinatorShard)ctx.coordinator.coordinator()).fullRecords());
        writer.commit(TP);
        processor.poll();
        Assertions.assertTrue((boolean)write1.isDone());
    }

    private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(CoordinatorRuntime<S, U> runtime, TopicPartition tp) {
        return c -> c.equals((Object)runtime.contextOrThrow((TopicPartition)tp).coordinator);
    }

    private static class MockCoordinatorLoader
    implements CoordinatorLoader<String> {
        private final CoordinatorLoader.LoadSummary summary;
        private final List<Long> lastWrittenOffsets;
        private final List<Long> lastCommittedOffsets;

        public MockCoordinatorLoader(CoordinatorLoader.LoadSummary summary, List<Long> lastWrittenOffsets, List<Long> lastCommittedOffsets) {
            this.summary = summary;
            this.lastWrittenOffsets = lastWrittenOffsets;
            this.lastCommittedOffsets = lastCommittedOffsets;
        }

        public MockCoordinatorLoader() {
            this(null, Collections.emptyList(), Collections.emptyList());
        }

        public CompletableFuture<CoordinatorLoader.LoadSummary> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
            this.lastWrittenOffsets.forEach(arg_0 -> replayable.updateLastWrittenOffset(arg_0));
            this.lastCommittedOffsets.forEach(arg_0 -> replayable.updateLastCommittedOffset(arg_0));
            return CompletableFuture.completedFuture(this.summary);
        }

        public void close() {
        }
    }

    private static class MockPartitionWriter
    extends InMemoryPartitionWriter {
        private final Time time;
        private final int maxWrites;
        private final boolean failEndMarker;
        private final AtomicInteger writeCount = new AtomicInteger(0);

        public MockPartitionWriter() {
            this((Time)new MockTime(), Integer.MAX_VALUE, false);
        }

        public MockPartitionWriter(int maxWrites) {
            this((Time)new MockTime(), maxWrites, false);
        }

        public MockPartitionWriter(boolean failEndMarker) {
            this((Time)new MockTime(), Integer.MAX_VALUE, failEndMarker);
        }

        public MockPartitionWriter(Time time, int maxWrites, boolean failEndMarker) {
            super(false);
            this.time = time;
            this.maxWrites = maxWrites;
            this.failEndMarker = failEndMarker;
        }

        @Override
        public void registerListener(TopicPartition tp, PartitionWriter.Listener listener) {
            super.registerListener(tp, listener);
        }

        @Override
        public void deregisterListener(TopicPartition tp, PartitionWriter.Listener listener) {
            super.deregisterListener(tp, listener);
        }

        @Override
        public long append(TopicPartition tp, VerificationGuard verificationGuard, MemoryRecords batch) {
            if (batch.sizeInBytes() > this.config(tp).maxMessageSize()) {
                throw new RecordTooLargeException("Batch is larger than the max message size");
            }
            if (batch.validBytes() <= 0) {
                throw new KafkaException("Coordinator tried to write an empty batch");
            }
            if (this.writeCount.incrementAndGet() > this.maxWrites) {
                throw new KafkaException("Maximum number of writes reached");
            }
            if (this.failEndMarker && batch.firstBatch().isControlBatch()) {
                throw new KafkaException("Couldn't write end marker.");
            }
            this.time.sleep(10L);
            return super.append(tp, verificationGuard, batch);
        }
    }

    private static class MockCoordinatorShardBuilderSupplier
    implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
        private MockCoordinatorShardBuilderSupplier() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
            return new MockCoordinatorShardBuilder();
        }
    }

    private static class MockCoordinatorShardBuilder
    implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
        private SnapshotRegistry snapshotRegistry;
        private CoordinatorTimer<Void, String> timer;
        private CoordinatorExecutor<String> executor;

        private MockCoordinatorShardBuilder() {
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(LogContext logContext) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(Time time) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withExecutor(CoordinatorExecutor<String> executor) {
            this.executor = executor;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(CoordinatorTimer<Void, String> timer) {
            this.timer = timer;
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
            return this;
        }

        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(TopicPartition topicPartition) {
            return this;
        }

        public MockCoordinatorShard build() {
            return new MockCoordinatorShard(Objects.requireNonNull(this.snapshotRegistry), Objects.requireNonNull(this.timer), Objects.requireNonNull(this.executor));
        }
    }

    static class MockCoordinatorShard
    implements CoordinatorShard<String> {
        private final SnapshotRegistry snapshotRegistry;
        private final TimelineHashSet<RecordAndMetadata> records;
        private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> pendingRecords;
        private final CoordinatorTimer<Void, String> timer;
        private final CoordinatorExecutor<String> executor;

        MockCoordinatorShard(SnapshotRegistry snapshotRegistry, CoordinatorTimer<Void, String> timer) {
            this(snapshotRegistry, timer, null);
        }

        MockCoordinatorShard(SnapshotRegistry snapshotRegistry, CoordinatorTimer<Void, String> timer, CoordinatorExecutor<String> executor) {
            this.snapshotRegistry = snapshotRegistry;
            this.records = new TimelineHashSet(snapshotRegistry, 0);
            this.pendingRecords = new TimelineHashMap(snapshotRegistry, 0);
            this.timer = timer;
            this.executor = executor;
        }

        public void replay(long offset, long producerId, short producerEpoch, String record) throws RuntimeException {
            RecordAndMetadata recordAndMetadata = new RecordAndMetadata(offset, producerId, producerEpoch, record);
            if (producerId == -1L) {
                this.records.add((Object)recordAndMetadata);
            } else {
                ((TimelineHashSet)this.pendingRecords.computeIfAbsent((Object)producerId, __ -> new TimelineHashSet(this.snapshotRegistry, 0))).add((Object)recordAndMetadata);
            }
        }

        public void replayEndTransactionMarker(long producerId, short producerEpoch, TransactionResult result) throws RuntimeException {
            if (result == TransactionResult.COMMIT) {
                TimelineHashSet pending = (TimelineHashSet)this.pendingRecords.remove((Object)producerId);
                if (pending == null) {
                    return;
                }
                this.records.addAll((Collection)pending);
            } else {
                this.pendingRecords.remove((Object)producerId);
            }
        }

        Set<String> pendingRecords(long producerId) {
            TimelineHashSet pending = (TimelineHashSet)this.pendingRecords.get((Object)producerId);
            if (pending == null) {
                return Collections.emptySet();
            }
            return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
        }

        Set<String> records() {
            return this.records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
        }

        List<RecordAndMetadata> fullRecords() {
            return this.records.stream().sorted(Comparator.comparingLong(record -> record.offset)).collect(Collectors.toList());
        }

        static class RecordAndMetadata {
            public final long offset;
            public final long producerId;
            public final short producerEpoch;
            public final String record;

            public RecordAndMetadata(long offset, String record) {
                this(offset, -1L, -1, record);
            }

            public RecordAndMetadata(long offset, long producerId, short producerEpoch, String record) {
                this.offset = offset;
                this.producerId = producerId;
                this.producerEpoch = producerEpoch;
                this.record = record;
            }

            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (o == null || this.getClass() != o.getClass()) {
                    return false;
                }
                RecordAndMetadata that = (RecordAndMetadata)o;
                if (this.offset != that.offset) {
                    return false;
                }
                if (this.producerId != that.producerId) {
                    return false;
                }
                if (this.producerEpoch != that.producerEpoch) {
                    return false;
                }
                return Objects.equals(this.record, that.record);
            }

            public int hashCode() {
                int result = (int)(this.offset ^ this.offset >>> 32);
                result = 31 * result + (int)(this.producerId ^ this.producerId >>> 32);
                result = 31 * result + this.producerEpoch;
                result = 31 * result + (this.record != null ? this.record.hashCode() : 0);
                return result;
            }

            public String toString() {
                return "RecordAndMetadata(offset=" + this.offset + ", producerId=" + this.producerId + ", producerEpoch=" + this.producerEpoch + ", record='" + this.record.substring(0, Math.min(10, this.record.length())) + "')";
            }
        }
    }

    private static class DirectEventProcessor
    implements CoordinatorEventProcessor {
        private DirectEventProcessor() {
        }

        public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
            try {
                event.run();
            }
            catch (Throwable ex) {
                event.complete(ex);
            }
        }

        public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
            try {
                event.run();
            }
            catch (Throwable ex) {
                event.complete(ex);
            }
        }

        public void close() {
        }
    }

    private static class StringSerializer
    implements Serializer<String> {
        private StringSerializer() {
        }

        public byte[] serializeKey(String record) {
            return null;
        }

        public byte[] serializeValue(String record) {
            return record.getBytes(Charset.defaultCharset());
        }
    }

    private static class ManualEventProcessor
    implements CoordinatorEventProcessor {
        private final Deque<CoordinatorEvent> queue = new LinkedList<CoordinatorEvent>();

        private ManualEventProcessor() {
        }

        public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
            this.queue.addLast(event);
        }

        public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
            this.queue.addFirst(event);
        }

        public boolean poll() {
            CoordinatorEvent event = this.queue.poll();
            if (event == null) {
                return false;
            }
            try {
                event.run();
            }
            catch (Throwable ex) {
                event.complete(ex);
            }
            return true;
        }

        public int size() {
            return this.queue.size();
        }

        public void close() {
        }
    }

    private static class ThrowingSerializer<T>
    implements Serializer<T> {
        private final Serializer<T> serializer;
        private boolean throwOnNextOperation;

        public ThrowingSerializer(Serializer<T> serializer) {
            this.serializer = serializer;
            this.throwOnNextOperation = false;
        }

        public void throwOnNextOperation() {
            this.throwOnNextOperation = true;
        }

        public byte[] serializeKey(T record) {
            return this.serializer.serializeKey(record);
        }

        public byte[] serializeValue(T record) {
            if (this.throwOnNextOperation) {
                this.throwOnNextOperation = false;
                throw new BufferOverflowException();
            }
            return this.serializer.serializeValue(record);
        }
    }
}

