package org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ColumnInfo;
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/InsertOperation.class */
public class InsertOperation extends AbstractJobOperation {
    private static final Logger LOG = LoggerFactory.getLogger(InsertOperation.class);
    private final String statement;
    private final List<ColumnInfo> columnInfos;
    private boolean fetched;

    public InsertOperation(FlinkEngineConnContext flinkEngineConnContext, String str, String str2) {
        super(flinkEngineConnContext);
        this.fetched = false;
        this.statement = str;
        this.columnInfos = Collections.singletonList(ColumnInfo.create(str2, new BigIntType(false)));
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    protected JobID submitJob() throws SqlExecutionException {
        return executeUpdateInternal(this.context.getExecutionContext());
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    protected Optional<Tuple2<List<Row>, List<Boolean>>> fetchJobResults() {
        if (this.fetched) {
            return Optional.empty();
        }
        this.fetched = true;
        return Optional.of(Tuple2.of(Collections.singletonList(Row.of(new Object[]{-2L})), (Object) null));
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation
    protected List<ColumnInfo> getColumnInfos() {
        return this.columnInfos;
    }

    private JobID executeUpdateInternal(ExecutionContext executionContext) throws SqlExecutionException {
        TableEnvironment tableEnvironment = executionContext.getTableEnvironment();
        try {
            TableResult tableResult = (TableResult) executionContext.wrapClassLoader(() -> {
                return tableEnvironment.executeSql(this.statement);
            });
            asyncNotify(tableResult);
            return ((JobClient) tableResult.getJobClient().get()).getJobID();
        } catch (Exception e) {
            LOG.error(String.format("Invalid SQL query, sql is: %s.", this.statement), e);
            throw new SqlExecutionException(FlinkErrorCodeSummary.INVALID_SQL_STATEMENT.getErrorDesc(), e);
        }
    }

    protected void asyncNotify(TableResult tableResult) {
        CompletableFuture.completedFuture(tableResult).thenApply(tableResult2 -> {
            CloseableIterator collect = tableResult2.collect();
            int i = 0;
            while (true) {
                int i2 = i;
                if (!collect.hasNext()) {
                    return Integer.valueOf(i2);
                }
                i = Integer.parseInt(((Row) collect.next()).getField(0).toString());
            }
        }).whenComplete((num, th) -> {
            if (th != null) {
                getFlinkStatusListeners().forEach(flinkStatusListener -> {
                    flinkStatusListener.onFailed("Error while submitting job.", th, RowsType.Affected());
                });
            } else {
                getFlinkStatusListeners().forEach(flinkStatusListener2 -> {
                    flinkStatusListener2.onSuccess(num.intValue(), RowsType.Affected());
                });
            }
        });
    }
}
