package org.apache.flink.table.client.gateway.local.result;

import java.io.IOException;
import java.net.InetAddress;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
import org.apache.flink.table.client.gateway.local.ProgramDeployer;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;

/* loaded from: input_file:org/apache/flink/table/client/gateway/local/result/CollectStreamResult.class */
public abstract class CollectStreamResult<C> extends BasicResult<C> implements DynamicResult<C> {
    private final TypeInformation<Row> outputType;
    private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
    private final CollectStreamTableSink collectTableSink;
    private final CollectStreamResult<C>.ResultRetrievalThread retrievalThread;
    private final CollectStreamResult<C>.JobMonitoringThread monitoringThread;
    private ProgramDeployer<C> deployer;
    protected final Object resultLock = new Object();
    protected SqlExecutionException executionException;

    /* loaded from: input_file:org/apache/flink/table/client/gateway/local/result/CollectStreamResult$JobMonitoringThread.class */
    private class JobMonitoringThread extends Thread {
        private JobMonitoringThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                CollectStreamResult.this.deployer.run();
            } catch (SqlExecutionException e) {
                CollectStreamResult.this.executionException = e;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/client/gateway/local/result/CollectStreamResult$ResultRetrievalThread.class */
    private class ResultRetrievalThread extends Thread {
        public volatile boolean isRunning;

        private ResultRetrievalThread() {
            this.isRunning = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning && CollectStreamResult.this.iterator.hasNext()) {
                try {
                    CollectStreamResult.this.processRecord((Tuple2) CollectStreamResult.this.iterator.next());
                } catch (RuntimeException e) {
                }
            }
            this.isRunning = false;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public CollectStreamResult(RowTypeInfo rowTypeInfo, ExecutionConfig executionConfig, InetAddress inetAddress, int i) {
        this.outputType = rowTypeInfo;
        TypeSerializer createSerializer = Types.TUPLE(new TypeInformation[]{Types.BOOLEAN, rowTypeInfo}).createSerializer(executionConfig);
        try {
            this.iterator = new SocketStreamIterator<>(i, inetAddress, createSerializer);
            this.collectTableSink = new CollectStreamTableSink(this.iterator.getBindAddress(), this.iterator.getPort(), createSerializer).configure(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
            this.retrievalThread = new ResultRetrievalThread();
            this.monitoringThread = new JobMonitoringThread();
        } catch (IOException e) {
            throw new SqlClientException("Could not start socket for result retrieval.", e);
        }
    }

    @Override // org.apache.flink.table.client.gateway.local.result.DynamicResult
    public TypeInformation<Row> getOutputType() {
        return this.outputType;
    }

    @Override // org.apache.flink.table.client.gateway.local.result.DynamicResult
    public void startRetrieval(ProgramDeployer<C> programDeployer) {
        this.retrievalThread.start();
        this.deployer = programDeployer;
        this.monitoringThread.start();
    }

    @Override // org.apache.flink.table.client.gateway.local.result.DynamicResult
    public TableSink<?> getTableSink() {
        return this.collectTableSink;
    }

    @Override // org.apache.flink.table.client.gateway.local.result.DynamicResult
    public void close() {
        this.retrievalThread.isRunning = false;
        this.retrievalThread.interrupt();
        this.monitoringThread.interrupt();
        this.iterator.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> TypedResult<T> handleMissingResult() {
        if (this.monitoringThread.isAlive()) {
            return TypedResult.empty();
        }
        if (this.executionException != null) {
            throw this.executionException;
        }
        return TypedResult.endOfStream();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isRetrieving() {
        return this.retrievalThread.isRunning;
    }

    protected abstract void processRecord(Tuple2<Boolean, Row> tuple2);
}
