/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway.local;

import java.io.IOException;
import java.net.InetAddress;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
import org.apache.flink.table.client.gateway.local.DynamicResult;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;

public abstract class CollectStreamResult<C>
implements DynamicResult<C> {
    private final TypeInformation<Row> outputType;
    private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
    private final CollectStreamTableSink collectTableSink;
    private final ResultRetrievalThread retrievalThread;
    private final JobMonitoringThread monitoringThread;
    private Runnable program;
    private C clusterId;
    protected final Object resultLock;
    protected SqlExecutionException executionException;

    public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort) {
        this.outputType = outputType;
        this.resultLock = new Object();
        TypeInformation socketType = Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.BOOLEAN, outputType});
        TypeSerializer serializer = socketType.createSerializer(config);
        try {
            this.iterator = new SocketStreamIterator(gatewayPort, gatewayAddress, serializer);
        }
        catch (IOException e2) {
            throw new SqlClientException("Could not start socket for result retrieval.", e2);
        }
        this.collectTableSink = new CollectStreamTableSink(this.iterator.getBindAddress(), this.iterator.getPort(), (TypeSerializer<Tuple2<Boolean, Row>>)serializer);
        this.retrievalThread = new ResultRetrievalThread();
        this.monitoringThread = new JobMonitoringThread();
    }

    @Override
    public void setClusterId(C clusterId) {
        if (this.clusterId != null) {
            throw new IllegalStateException("Cluster id is already present.");
        }
        this.clusterId = clusterId;
    }

    @Override
    public TypeInformation<Row> getOutputType() {
        return this.outputType;
    }

    @Override
    public void startRetrieval(Runnable program) {
        this.retrievalThread.start();
        this.program = program;
        this.monitoringThread.start();
    }

    @Override
    public TableSink<?> getTableSink() {
        return this.collectTableSink;
    }

    @Override
    public void close() {
        this.retrievalThread.isRunning = false;
        this.retrievalThread.interrupt();
        this.monitoringThread.interrupt();
        this.iterator.close();
    }

    protected <T> TypedResult<T> handleMissingResult() {
        if (this.monitoringThread.isAlive()) {
            return TypedResult.empty();
        }
        if (this.executionException != null) {
            throw this.executionException;
        }
        return TypedResult.endOfStream();
    }

    protected boolean isRetrieving() {
        return this.retrievalThread.isRunning;
    }

    protected abstract void processRecord(Tuple2<Boolean, Row> var1);

    private class ResultRetrievalThread
    extends Thread {
        public volatile boolean isRunning = true;

        private ResultRetrievalThread() {
        }

        @Override
        public void run() {
            try {
                while (this.isRunning && CollectStreamResult.this.iterator.hasNext()) {
                    Tuple2 change = (Tuple2)CollectStreamResult.this.iterator.next();
                    CollectStreamResult.this.processRecord((Tuple2<Boolean, Row>)change);
                }
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            this.isRunning = false;
        }
    }

    private class JobMonitoringThread
    extends Thread {
        private JobMonitoringThread() {
        }

        @Override
        public void run() {
            try {
                CollectStreamResult.this.program.run();
            }
            catch (SqlExecutionException e2) {
                CollectStreamResult.this.executionException = e2;
            }
        }
    }
}

