package org.apache.linkis.engineconnplugin.flink.client.result;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.class */
public class ChangelogResult extends AbstractResult<ApplicationId, Tuple2<Boolean, Row>> {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogResult.class);
    private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
    private final CollectStreamTableSink collectTableSink;
    private final ResultRetrievalThread retrievalThread;
    private CompletableFuture<JobExecutionResult> jobExecutionResultFuture;
    private final List<Tuple2<Boolean, Row>> changeRecordBuffer;
    private final int maxBufferSize;
    private AtomicReference<SqlExecutionException> executionException = new AtomicReference<>();
    private final Object resultLock = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult$ResultRetrievalThread.class */
    public class ResultRetrievalThread extends Thread {
        volatile boolean isRunning;
        private boolean isStatusListenersNotified;

        private ResultRetrievalThread() {
            this.isRunning = true;
            this.isStatusListenersNotified = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (this.isRunning && ChangelogResult.this.iterator.hasNext()) {
                try {
                    ChangelogResult.this.processRecord((Tuple2) ChangelogResult.this.iterator.next());
                    i++;
                } catch (Exception e) {
                    ChangelogResult.LOG.warn(getName() + " has finished with an error, ignore it.", e);
                }
            }
            if (!ChangelogResult.this.changeRecordBuffer.isEmpty()) {
                dealOrFailed(() -> {
                    ChangelogResult.this.getFlinkStreamingResultSetListeners().forEach(flinkStreamingResultSetListener -> {
                        flinkStreamingResultSetListener.onResultSetPulled(ChangelogResult.this.changeRecordBuffer.size());
                    });
                    return null;
                });
            }
            try {
                ChangelogResult.this.jobExecutionResultFuture.get();
            } catch (Exception e2) {
                ChangelogResult.LOG.warn(getName() + " has finished with an error, ignore it.", e2);
            }
            int i2 = i;
            ChangelogResult.LOG.warn("executionException is", (Throwable) ChangelogResult.this.executionException.get());
            if (!this.isStatusListenersNotified) {
                dealOrFailed(() -> {
                    SqlExecutionException sqlExecutionException = (SqlExecutionException) ChangelogResult.this.executionException.get();
                    if (sqlExecutionException != null) {
                        ChangelogResult.this.getFlinkStatusListeners().forEach(flinkStatusListener -> {
                            flinkStatusListener.onFailed(ExceptionUtils.getRootCauseMessage(sqlExecutionException), sqlExecutionException, RowsType.Fetched());
                        });
                        return null;
                    }
                    ChangelogResult.this.getFlinkStatusListeners().forEach(flinkStatusListener2 -> {
                        flinkStatusListener2.onSuccess(i2, RowsType.Fetched());
                    });
                    return null;
                });
            }
            this.isRunning = false;
        }

        private void dealOrFailed(Supplier<Void> supplier) {
            try {
                supplier.get();
            } catch (Exception e) {
                ChangelogResult.LOG.error("Listener execute failed!", e);
                this.isStatusListenersNotified = true;
                ChangelogResult.this.getFlinkStatusListeners().forEach(flinkStatusListener -> {
                    flinkStatusListener.onFailed(ExceptionUtils.getRootCauseMessage(e), e, RowsType.Fetched());
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public ChangelogResult(RowTypeInfo rowTypeInfo, TableSchema tableSchema, ExecutionConfig executionConfig, InetAddress inetAddress, int i, int i2) throws SqlExecutionException {
        TypeSerializer createSerializer = Types.TUPLE(new TypeInformation[]{Types.BOOLEAN, rowTypeInfo}).createSerializer(executionConfig);
        try {
            this.iterator = new SocketStreamIterator<>(i, inetAddress, createSerializer);
            this.collectTableSink = new CollectStreamTableSink(this.iterator.getBindAddress(), this.iterator.getPort(), createSerializer, tableSchema);
            this.retrievalThread = new ResultRetrievalThread();
            this.changeRecordBuffer = new ArrayList();
            this.maxBufferSize = i2;
        } catch (IOException e) {
            throw new SqlExecutionException("Could not start socket for result retrieval.", e);
        }
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public void startRetrieval(JobClient jobClient) {
        this.retrievalThread.setName(jobClient.getJobID().toHexString() + "-JobResult-Fetch-Thread");
        this.retrievalThread.start();
        this.jobExecutionResultFuture = CompletableFuture.completedFuture(jobClient).thenCompose((v0) -> {
            return v0.getJobExecutionResult();
        }).whenComplete((jobExecutionResult, th) -> {
            if (th == null) {
                LOG.warn("throwable is null");
            } else {
                LOG.warn("throwable is not null", th);
                this.executionException.compareAndSet(null, new SqlExecutionException("Error while submitting job.", th));
            }
        });
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() throws SqlExecutionException {
        synchronized (this.resultLock) {
            if (isRetrieving() && this.executionException.get() == null) {
                if (this.changeRecordBuffer.isEmpty()) {
                    return TypedResult.empty();
                }
                ArrayList arrayList = new ArrayList(this.changeRecordBuffer);
                this.changeRecordBuffer.clear();
                this.resultLock.notifyAll();
                return TypedResult.payload(arrayList);
            }
            if (isRetrieving() || this.changeRecordBuffer.isEmpty()) {
                return handleMissingResult();
            }
            ArrayList arrayList2 = new ArrayList(this.changeRecordBuffer);
            this.changeRecordBuffer.clear();
            return TypedResult.payload(arrayList2);
        }
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public TableSink<?> getTableSink() {
        return this.collectTableSink;
    }

    @Override // org.apache.linkis.engineconnplugin.flink.client.result.Result
    public void close() {
        this.retrievalThread.isRunning = false;
        this.iterator.close();
    }

    private <T> TypedResult<T> handleMissingResult() throws SqlExecutionException {
        if (!this.jobExecutionResultFuture.isDone()) {
            return TypedResult.empty();
        }
        if (this.executionException.get() != null) {
            throw ((SqlExecutionException) this.executionException.get());
        }
        return TypedResult.endOfStream();
    }

    private boolean isRetrieving() {
        return this.retrievalThread.isRunning;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processRecord(Tuple2<Boolean, Row> tuple2) {
        synchronized (this.resultLock) {
            while (this.changeRecordBuffer.size() >= this.maxBufferSize) {
                try {
                    getFlinkStreamingResultSetListeners().forEach(flinkStreamingResultSetListener -> {
                        flinkStreamingResultSetListener.onResultSetPulled(this.changeRecordBuffer.size());
                    });
                    this.resultLock.wait();
                } catch (InterruptedException e) {
                }
            }
            this.changeRecordBuffer.add(tuple2);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Accept the streaming result, row is {}.", tuple2.f1);
        }
    }
}
