package org.apache.linkis.engineconnplugin.flink.client.result;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.AbstractID;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.JobExecutionException;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/result/BatchResult.class */
public class BatchResult<C> extends AbstractResult<C, Row> {
    private final CollectBatchTableSink tableSink;
    private List<Row> resultTable;
    private Logger LOG = LoggerFactory.getLogger(getClass());
    private AtomicReference<SqlExecutionException> executionException = new AtomicReference<>();
    private boolean allResultRetrieved = false;
    private final String accumulatorName = new AbstractID().toString();
    private final Object resultLock = new Object();

    /* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/result/BatchResult$ResultRetrievalHandler.class */
    private class ResultRetrievalHandler implements Consumer<JobExecutionResult> {
        private ResultRetrievalHandler() {
        }

        @Override // java.util.function.Consumer
        public void accept(JobExecutionResult jobExecutionResult) {
            try {
                ArrayList arrayList = (ArrayList) jobExecutionResult.getAccumulatorResult(BatchResult.this.accumulatorName);
                if (arrayList == null) {
                    throw new JobExecutionException(FlinkErrorCodeSummary.NOT_RETRIEVE_RESULT.getErrorDesc());
                }
                List deserializeList = SerializedListAccumulator.deserializeList(arrayList, BatchResult.this.tableSink.getSerializer());
                synchronized (BatchResult.this.resultLock) {
                    BatchResult.this.resultTable = deserializeList;
                }
                BatchResult.this.LOG.info("Accept the result, row is {}.", Integer.valueOf(deserializeList.size()));
                BatchResult.this.getFlinkStatusListeners().forEach(flinkStatusListener -> {
                    flinkStatusListener.onSuccess(deserializeList.size(), RowsType.Fetched());
                });
            } catch (IOException | ClassNotFoundException | JobExecutionException e) {
                BatchResult.this.getFlinkStatusListeners().forEach(flinkStatusListener2 -> {
                    flinkStatusListener2.onFailed("Serialization error while deserialize collected data.", e, RowsType.Fetched());
                });
                throw new RuntimeException("Serialization error while deserialize collected data.", e);
            }
        }
    }

    public BatchResult(TableSchema tableSchema, RowTypeInfo rowTypeInfo, ExecutionConfig executionConfig) {
        this.tableSink = new CollectBatchTableSink(this.accumulatorName, rowTypeInfo.createSerializer(executionConfig), tableSchema);
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public void startRetrieval(JobClient jobClient) {
        CompletableFuture.completedFuture(jobClient).thenCompose((v0) -> {
            return v0.getJobExecutionResult();
        }).thenAccept((Consumer) new ResultRetrievalHandler()).whenComplete((r8, th) -> {
            if (th != null) {
                this.executionException.compareAndSet(null, new SqlExecutionException(FlinkErrorCodeSummary.ERROR_SUBMITTING_JOB.getErrorDesc(), th));
            }
        });
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public TypedResult<List<Row>> retrieveChanges() throws SqlExecutionException {
        synchronized (this.resultLock) {
            SqlExecutionException sqlExecutionException = this.executionException.get();
            if (sqlExecutionException != null) {
                throw sqlExecutionException;
            }
            if (null == this.resultTable) {
                return TypedResult.empty();
            }
            if (this.allResultRetrieved) {
                return TypedResult.endOfStream();
            }
            this.allResultRetrieved = true;
            return TypedResult.payload(this.resultTable);
        }
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public TableSink<?> getTableSink() {
        return this.tableSink;
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public void close() {
    }
}
