/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.common.runtime;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutorImpl;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class CoordinatorExecutorImplTest {
    private static final LogContext LOG_CONTEXT = new LogContext();
    private static final TopicPartition SHARD_PARTITION = new TopicPartition("__consumer_offsets", 0);
    private static final Duration WRITE_TIMEOUT = Duration.ofMillis(1000L);
    private static final String TASK_KEY = "task";

    @Test
    public void testTaskSuccessfulLifecycle() {
        CoordinatorShard coordinatorShard = (CoordinatorShard)Mockito.mock(CoordinatorShard.class);
        CoordinatorRuntime runtime = (CoordinatorRuntime)Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl executor = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, runtime, executorService, WRITE_TIMEOUT);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)TASK_KEY), (TopicPartition)ArgumentMatchers.eq((Object)SHARD_PARTITION), (Duration)ArgumentMatchers.eq((Object)WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenAnswer(args -> {
            Assertions.assertTrue((boolean)executor.isScheduled(TASK_KEY));
            CoordinatorRuntime.CoordinatorWriteOperation op = (CoordinatorRuntime.CoordinatorWriteOperation)args.getArgument(3);
            Assertions.assertEquals((Object)new CoordinatorResult(Collections.singletonList("record"), null), (Object)op.generateRecordsAndResult((Object)coordinatorShard));
            return CompletableFuture.completedFuture(null);
        });
        Mockito.when(executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(args -> {
            Assertions.assertTrue((boolean)executor.isScheduled(TASK_KEY));
            Runnable op = (Runnable)args.getArgument(0);
            op.run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean taskCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            taskCalled.set(true);
            return "Hello!";
        };
        AtomicBoolean operationCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskOperation taskOperation = (result, exception) -> {
            operationCalled.set(true);
            Assertions.assertEquals((Object)"Hello!", (Object)result);
            Assertions.assertNull((Object)exception);
            return new CoordinatorResult(Collections.singletonList("record"), null);
        };
        executor.schedule(TASK_KEY, taskRunnable, taskOperation);
        Assertions.assertTrue((boolean)taskCalled.get());
        Assertions.assertTrue((boolean)operationCalled.get());
    }

    @Test
    public void testTaskFailedLifecycle() {
        CoordinatorShard coordinatorShard = (CoordinatorShard)Mockito.mock(CoordinatorShard.class);
        CoordinatorRuntime runtime = (CoordinatorRuntime)Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl executor = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, runtime, executorService, WRITE_TIMEOUT);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)TASK_KEY), (TopicPartition)ArgumentMatchers.eq((Object)SHARD_PARTITION), (Duration)ArgumentMatchers.eq((Object)WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenAnswer(args -> {
            CoordinatorRuntime.CoordinatorWriteOperation op = (CoordinatorRuntime.CoordinatorWriteOperation)args.getArgument(3);
            Assertions.assertEquals((Object)new CoordinatorResult(Collections.emptyList(), null), (Object)op.generateRecordsAndResult((Object)coordinatorShard));
            return CompletableFuture.completedFuture(null);
        });
        Mockito.when(executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(args -> {
            Runnable op = (Runnable)args.getArgument(0);
            op.run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean taskCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            taskCalled.set(true);
            throw new Exception("Oh no!");
        };
        AtomicBoolean operationCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskOperation taskOperation = (result, exception) -> {
            operationCalled.set(true);
            Assertions.assertNull((Object)result);
            Assertions.assertNotNull((Object)exception);
            Assertions.assertEquals((Object)"Oh no!", (Object)exception.getMessage());
            return new CoordinatorResult(Collections.emptyList(), null);
        };
        executor.schedule(TASK_KEY, taskRunnable, taskOperation);
        Assertions.assertTrue((boolean)taskCalled.get());
        Assertions.assertTrue((boolean)operationCalled.get());
    }

    @Test
    public void testTaskCancelledBeforeBeingExecuted() {
        CoordinatorRuntime runtime = (CoordinatorRuntime)Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl executor = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, runtime, executorService, WRITE_TIMEOUT);
        Mockito.when(executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(args -> {
            executor.cancel(TASK_KEY);
            Runnable op = (Runnable)args.getArgument(0);
            op.run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean taskCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            taskCalled.set(true);
            return null;
        };
        AtomicBoolean operationCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskOperation taskOperation = (result, exception) -> {
            operationCalled.set(true);
            return null;
        };
        executor.schedule(TASK_KEY, taskRunnable, taskOperation);
        Assertions.assertFalse((boolean)taskCalled.get());
        Assertions.assertFalse((boolean)operationCalled.get());
    }

    @Test
    public void testTaskCancelledAfterBeingExecutedButBeforeWriteOperationIsExecuted() {
        CoordinatorShard coordinatorShard = (CoordinatorShard)Mockito.mock(CoordinatorShard.class);
        CoordinatorRuntime runtime = (CoordinatorRuntime)Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl executor = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, runtime, executorService, WRITE_TIMEOUT);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)TASK_KEY), (TopicPartition)ArgumentMatchers.eq((Object)SHARD_PARTITION), (Duration)ArgumentMatchers.eq((Object)WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenAnswer(args -> {
            executor.cancel(TASK_KEY);
            CoordinatorRuntime.CoordinatorWriteOperation op = (CoordinatorRuntime.CoordinatorWriteOperation)args.getArgument(3);
            Throwable ex = Assertions.assertThrows(RejectedExecutionException.class, () -> op.generateRecordsAndResult((Object)coordinatorShard));
            return FutureUtils.failedFuture((Throwable)ex);
        });
        Mockito.when(executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(args -> {
            Runnable op = (Runnable)args.getArgument(0);
            op.run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean taskCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            taskCalled.set(true);
            return "Hello!";
        };
        AtomicBoolean operationCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskOperation taskOperation = (result, exception) -> {
            operationCalled.set(true);
            return null;
        };
        executor.schedule(TASK_KEY, taskRunnable, taskOperation);
        Assertions.assertTrue((boolean)taskCalled.get());
        Assertions.assertFalse((boolean)operationCalled.get());
    }

    @Test
    public void testTaskSchedulingWriteOperationFailed() {
        CoordinatorRuntime runtime = (CoordinatorRuntime)Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl executor = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, runtime, executorService, WRITE_TIMEOUT);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)TASK_KEY), (TopicPartition)ArgumentMatchers.eq((Object)SHARD_PARTITION), (Duration)ArgumentMatchers.eq((Object)WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new Throwable("Oh no!")));
        Mockito.when(executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(args -> {
            Runnable op = (Runnable)args.getArgument(0);
            op.run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean taskCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            taskCalled.set(true);
            return "Hello!";
        };
        AtomicBoolean operationCalled = new AtomicBoolean(false);
        CoordinatorExecutor.TaskOperation taskOperation = (result, exception) -> {
            operationCalled.set(true);
            return new CoordinatorResult(Collections.emptyList(), null);
        };
        executor.schedule(TASK_KEY, taskRunnable, taskOperation);
        Assertions.assertTrue((boolean)taskCalled.get());
        Assertions.assertFalse((boolean)operationCalled.get());
        Assertions.assertFalse((boolean)executor.isScheduled(TASK_KEY));
    }
}

