package org.apache.flink.table.gateway.service.operation;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.internal.StaticResultProvider;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.ResultSetImpl;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlCancelException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/gateway/service/operation/OperationManagerTest.class */
class OperationManagerTest {
    private static final ExecutorService EXECUTOR_SERVICE = ThreadUtils.newThreadPool(5, 500, 600000, "operation-manager-test");
    private static OperationManager operationManager;
    private static ResultSet defaultResultSet;
    private final ThreadFactory threadFactory = new ExecutorThreadFactory("SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);

    OperationManagerTest() {
    }

    @BeforeEach
    void setUp() {
        operationManager = new OperationManager(EXECUTOR_SERVICE);
        defaultResultSet = new ResultSetImpl(ResultSet.ResultType.PAYLOAD, 1L, ResolvedSchema.of(new Column[]{Column.physical("id", DataTypes.BIGINT())}), Collections.singletonList(GenericRowData.of(new Object[]{1L})), StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER, false, (JobID) null, ResultKind.SUCCESS_WITH_CONTENT);
    }

    @AfterEach
    void cleanEach() {
        operationManager.close();
    }

    @AfterAll
    static void cleanUp() {
        EXECUTOR_SERVICE.shutdown();
    }

    @Test
    void testRunOperationAsynchronously() throws Exception {
        OperationHandle submitOperation = operationManager.submitOperation(() -> {
            return defaultResultSet;
        });
        Assertions.assertThat(operationManager.getOperationInfo(submitOperation).getStatus()).isNotEqualTo(OperationStatus.ERROR);
        Assertions.assertThat(operationManager.getOperationResultSchema(submitOperation)).isEqualTo(ResolvedSchema.of(new Column[]{Column.physical("id", DataTypes.BIGINT())}));
        Assertions.assertThat(operationManager.getOperationInfo(submitOperation).getStatus()).isEqualTo(OperationStatus.FINISHED);
    }

    @Test
    void testRunOperationSynchronously() throws Exception {
        OperationHandle submitOperation = operationManager.submitOperation(() -> {
            return defaultResultSet;
        });
        operationManager.awaitOperationTermination(submitOperation);
        Assertions.assertThat(operationManager.getOperationInfo(submitOperation).getStatus()).isEqualTo(OperationStatus.FINISHED);
        Assertions.assertThat(operationManager.fetchResults(submitOperation, 0L, Integer.MAX_VALUE)).isEqualTo(defaultResultSet);
    }

    @Test
    void testCancelOperation() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        OperationHandle submitOperation = operationManager.submitOperation(() -> {
            countDownLatch.await();
            return defaultResultSet;
        });
        this.threadFactory.newThread(() -> {
            operationManager.cancelOperation(submitOperation);
        }).start();
        operationManager.awaitOperationTermination(submitOperation);
        Assertions.assertThat(operationManager.getOperationInfo(submitOperation).getStatus()).isEqualTo(OperationStatus.CANCELED);
    }

    @Test
    void testCancelUninterruptedOperation() throws Exception {
        AtomicReference atomicReference = new AtomicReference(false);
        OperationHandle submitOperation = operationManager.submitOperation(() -> {
            while (true) {
                atomicReference.compareAndSet(false, true);
            }
        });
        atomicReference.getClass();
        CommonTestUtils.waitUtil(atomicReference::get, Duration.ofSeconds(10L), "Failed to start up the task.");
        Assertions.assertThatThrownBy(() -> {
            operationManager.cancelOperation(submitOperation);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlCancelException.class, String.format("Operation '%s' did not react to \"Future.cancel(true)\" and is stuck for %s seconds in method.\n", submitOperation, 5))});
        Assertions.assertThat(operationManager.getOperationInfo(submitOperation).getStatus()).isEqualTo(OperationStatus.CANCELED);
    }

    @Test
    void testCloseUninterruptedOperation() throws Exception {
        AtomicReference atomicReference = new AtomicReference(false);
        for (int i = 0; i < 10; i++) {
            this.threadFactory.newThread(() -> {
                operationManager.submitOperation(() -> {
                    while (true) {
                        atomicReference.compareAndSet(false, true);
                    }
                });
            }).start();
        }
        atomicReference.getClass();
        CommonTestUtils.waitUtil(atomicReference::get, Duration.ofSeconds(10L), "Failed to start up the task.");
        Assertions.assertThatThrownBy(() -> {
            operationManager.close();
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlCancelException.class)});
        Assertions.assertThat(operationManager.getOperationCount()).isEqualTo(0);
    }

    @Test
    void testCloseOperation() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        OperationHandle submitOperation = operationManager.submitOperation(() -> {
            countDownLatch.await();
            return defaultResultSet;
        });
        this.threadFactory.newThread(() -> {
            operationManager.closeOperation(submitOperation);
        }).start();
        operationManager.awaitOperationTermination(submitOperation);
        Assertions.assertThatThrownBy(() -> {
            operationManager.getOperation(submitOperation);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlGatewayException.class, String.format("Can not find the submitted operation in the OperationManager with the %s.", submitOperation))});
    }

    @Test
    void testRunOperationSynchronouslyWithError() {
        OperationHandle submitOperation = operationManager.submitOperation(() -> {
            throw new SqlExecutionException("Execution error.");
        });
        Assertions.assertThatThrownBy(() -> {
            operationManager.awaitOperationTermination(submitOperation);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlExecutionException.class, "Execution error.")});
        Assertions.assertThat(operationManager.getOperationInfo(submitOperation).getStatus()).isEqualTo(OperationStatus.ERROR);
    }
}
