/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway.local.result;

import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
import org.apache.flink.table.client.gateway.local.result.BasicResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;

public abstract class CollectStreamResult<C>
extends BasicResult<C>
implements DynamicResult<C> {
    private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
    private final CollectStreamTableSink collectTableSink;
    private final ResultRetrievalThread retrievalThread;
    private CompletableFuture<JobExecutionResult> jobExecutionResultFuture;
    protected final Object resultLock;
    protected AtomicReference<SqlExecutionException> executionException = new AtomicReference();

    public CollectStreamResult(TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort) {
        this.resultLock = new Object();
        TypeInformation socketType = Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.BOOLEAN, tableSchema.toRowType()});
        TypeSerializer serializer = socketType.createSerializer(config);
        try {
            this.iterator = new SocketStreamIterator(gatewayPort, gatewayAddress, serializer);
        }
        catch (IOException e) {
            throw new SqlClientException("Could not start socket for result retrieval.", e);
        }
        this.collectTableSink = new CollectStreamTableSink(this.iterator.getBindAddress(), this.iterator.getPort(), (TypeSerializer<Tuple2<Boolean, Row>>)serializer, tableSchema);
        this.retrievalThread = new ResultRetrievalThread();
    }

    @Override
    public void startRetrieval(JobClient jobClient) {
        this.retrievalThread.start();
        this.jobExecutionResultFuture = jobClient.getJobExecutionResult().whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.executionException.compareAndSet(null, new SqlExecutionException("Error while retrieving result.", (Throwable)throwable));
            }
        });
    }

    @Override
    public TableSink<?> getTableSink() {
        return this.collectTableSink;
    }

    @Override
    public void close() {
        this.retrievalThread.isRunning = false;
        this.retrievalThread.interrupt();
        this.iterator.close();
    }

    protected <T> TypedResult<T> handleMissingResult() {
        if (!this.jobExecutionResultFuture.isDone()) {
            return TypedResult.empty();
        }
        if (this.executionException.get() != null) {
            throw this.executionException.get();
        }
        return TypedResult.endOfStream();
    }

    protected boolean isRetrieving() {
        return this.retrievalThread.isRunning;
    }

    protected abstract void processRecord(Tuple2<Boolean, Row> var1);

    private class ResultRetrievalThread
    extends Thread {
        public volatile boolean isRunning = true;

        private ResultRetrievalThread() {
        }

        @Override
        public void run() {
            try {
                while (this.isRunning && CollectStreamResult.this.iterator.hasNext()) {
                    Tuple2 change = (Tuple2)CollectStreamResult.this.iterator.next();
                    CollectStreamResult.this.processRecord((Tuple2<Boolean, Row>)change);
                }
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            this.isRunning = false;
        }
    }
}

