package org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.Row;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.client.result.AbstractResult;
import org.apache.linkis.engineconnplugin.flink.client.result.BatchResult;
import org.apache.linkis.engineconnplugin.flink.client.result.ChangelogResult;
import org.apache.linkis.engineconnplugin.flink.client.result.ResultUtil;
import org.apache.linkis.engineconnplugin.flink.client.result.TypedResult;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ColumnInfo;
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/SelectOperation.class */
public class SelectOperation extends AbstractJobOperation {
    private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class);
    private final String query;
    private AbstractResult<?, ?> result;
    private TableSchema resultSchema;
    private List<ColumnInfo> columnInfos;

    public SelectOperation(FlinkEngineConnContext flinkEngineConnContext, String str) {
        super(flinkEngineConnContext);
        this.query = str;
        this.noMoreResult = false;
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    protected JobID submitJob() throws SqlExecutionException {
        JobID executeQueryInternal = executeQueryInternal(this.context.getExecutionContext(), this.query);
        List<TableColumn> tableColumns = this.resultSchema.getTableColumns();
        this.columnInfos = new ArrayList();
        for (TableColumn tableColumn : tableColumns) {
            this.columnInfos.add(ColumnInfo.create(tableColumn.getName(), tableColumn.getType().getLogicalType()));
        }
        return executeQueryInternal;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    public void cancelJobInternal() throws JobExecutionException {
        LOG.info("Start to cancel job {} and result retrieval.", getJobId());
        this.result.close();
        super.cancelJobInternal();
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    protected Optional<Tuple2<List<Row>, List<Boolean>>> fetchJobResults() throws SqlExecutionException {
        Optional<Tuple2<List<Row>, List<Boolean>>> fetchStreamingResult;
        synchronized (this.lock) {
            if (this.result == null) {
                LOG.error("The job for this query has been canceled.");
                throw new SqlExecutionException("The job for this query has been canceled.");
            }
            fetchStreamingResult = this.result instanceof ChangelogResult ? fetchStreamingResult() : fetchBatchResult();
        }
        return fetchStreamingResult;
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    protected List<ColumnInfo> getColumnInfos() {
        return this.columnInfos;
    }

    private Optional<Tuple2<List<Row>, List<Boolean>>> fetchBatchResult() throws SqlExecutionException {
        TypedResult<List<Row>> retrieveChanges = ((BatchResult) this.result).retrieveChanges();
        return retrieveChanges.getType() == TypedResult.ResultType.PAYLOAD ? Optional.of(Tuple2.of(retrieveChanges.getPayload(), (Object) null)) : Optional.of(Tuple2.of(Collections.emptyList(), (Object) null));
    }

    private Optional<Tuple2<List<Row>, List<Boolean>>> fetchStreamingResult() throws SqlExecutionException {
        TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges = ((ChangelogResult) this.result).retrieveChanges();
        if (retrieveChanges.getType() != TypedResult.ResultType.EOS && retrieveChanges.getType() == TypedResult.ResultType.PAYLOAD) {
            List<Tuple2<Boolean, Row>> payload = retrieveChanges.getPayload();
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (Tuple2<Boolean, Row> tuple2 : payload) {
                arrayList.add(tuple2.f1);
                arrayList2.add(tuple2.f0);
            }
            return Optional.of(Tuple2.of(arrayList, arrayList2));
        }
        return Optional.of(Tuple2.of(Collections.emptyList(), Collections.emptyList()));
    }

    private JobID executeQueryInternal(ExecutionContext executionContext, String str) throws SqlExecutionException {
        Table createTable = createTable(executionContext, executionContext.getTableEnvironment(), str);
        boolean inStreamingMode = executionContext.getEnvironment().getExecution().inStreamingMode();
        this.resultSchema = removeTimeAttributes(createTable.getSchema());
        if (inStreamingMode) {
            this.result = ResultUtil.createChangelogResult(executionContext.getFlinkConfig(), executionContext.getEnvironment(), this.resultSchema, executionContext.getExecutionConfig());
        } else {
            this.result = ResultUtil.createBatchResult(this.resultSchema, executionContext.getExecutionConfig());
        }
        this.result.setFlinkListeners(getFlinkListeners());
        String format = String.format("_tmp_table_%s", UUID.randomUUID().toString().replace("-", ""));
        try {
            try {
                TableResult tableResult = (TableResult) executionContext.wrapClassLoader(tableEnvironmentInternal -> {
                    tableEnvironmentInternal.registerTableSinkInternal(format, this.result.getTableSink());
                    return createTable.executeInsert(format);
                });
                executionContext.wrapClassLoader(tableEnvironmentInternal2 -> {
                    return Boolean.valueOf(tableEnvironmentInternal2.dropTemporaryTable(format));
                });
                return (JobID) tableResult.getJobClient().map(jobClient -> {
                    JobID jobID = jobClient.getJobID();
                    LOG.info("Submit flink job: {} successfully.", jobID);
                    this.result.startRetrieval(jobClient);
                    return jobID;
                }).orElseThrow(() -> {
                    return new SqlExecutionException("No job is generated, please ask admin for help!");
                });
            } catch (Exception e) {
                this.result.close();
                LOG.error(String.format("Invalid SQL query, sql is %s.", str), e);
                throw new SqlExecutionException("Invalid SQL query.", e);
            }
        } catch (Throwable th) {
            executionContext.wrapClassLoader(tableEnvironmentInternal22 -> {
                return Boolean.valueOf(tableEnvironmentInternal22.dropTemporaryTable(format));
            });
            throw th;
        }
    }

    private Table createTable(ExecutionContext executionContext, TableEnvironment tableEnvironment, String str) throws SqlExecutionException {
        try {
            return (Table) executionContext.wrapClassLoader(() -> {
                return tableEnvironment.sqlQuery(str);
            });
        } catch (Exception e) {
            throw new SqlExecutionException("Invalid SQL statement.", e);
        }
    }

    private TableSchema removeTimeAttributes(TableSchema tableSchema) {
        TableSchema.Builder builder = TableSchema.builder();
        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
            DataType dataType = tableSchema.getFieldDataTypes()[i];
            builder.field(tableSchema.getFieldNames()[i], DataTypeUtils.replaceLogicalType(dataType, LogicalTypeUtils.removeTimeAttributes(dataType.getLogicalType())));
        }
        return builder.build();
    }
}
