package org.apache.zeppelin.flink.sql;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.types.Row;
import org.apache.zeppelin.flink.FlinkShims;
import org.apache.zeppelin.flink.JobManager;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.tabledata.TableDataUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.class */
public abstract class AbstractStreamSqlJob {
    private static Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSqlJob.class);
    private static AtomicInteger SQL_INDEX = new AtomicInteger(0);
    protected StreamExecutionEnvironment senv;
    protected TableEnvironment stenv;
    private Table table;
    protected JobManager jobManager;
    protected InterpreterContext context;
    protected TableSchema schema;
    protected SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
    protected int defaultParallelism;
    protected FlinkShims flinkShims;
    protected Object resultLock = new Object();
    protected volatile boolean enableToRefresh = true;
    protected ScheduledExecutorService refreshScheduler = Executors.newScheduledThreadPool(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/zeppelin/flink/sql/AbstractStreamSqlJob$RefreshTask.class */
    public class RefreshTask implements Runnable {
        private InterpreterContext context;

        RefreshTask(InterpreterContext interpreterContext) {
            this.context = interpreterContext;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                synchronized (AbstractStreamSqlJob.this.resultLock) {
                    if (!AbstractStreamSqlJob.this.enableToRefresh) {
                        AbstractStreamSqlJob.this.resultLock.wait();
                    }
                    AbstractStreamSqlJob.LOGGER.debug("Refresh result of paragraph: " + this.context.getParagraphId());
                    AbstractStreamSqlJob.this.refresh(this.context);
                }
            } catch (Exception e) {
                AbstractStreamSqlJob.LOGGER.error("Fail to refresh task", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/zeppelin/flink/sql/AbstractStreamSqlJob$ResultRetrievalThread.class */
    public class ResultRetrievalThread extends Thread {
        private ScheduledExecutorService refreshExecutorService;
        volatile boolean isRunning = true;

        ResultRetrievalThread(ScheduledExecutorService scheduledExecutorService) {
            this.refreshExecutorService = scheduledExecutorService;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning && AbstractStreamSqlJob.this.iterator.hasNext()) {
                try {
                    AbstractStreamSqlJob.this.processRecord((Tuple2) AbstractStreamSqlJob.this.iterator.next());
                } catch (Throwable th) {
                    AbstractStreamSqlJob.LOGGER.error("Fail to process record", th);
                }
            }
            this.isRunning = false;
            AbstractStreamSqlJob.LOGGER.info("ResultRetrieval Thread is done, isRunning={}, hasNext={}", Boolean.valueOf(this.isRunning), Boolean.valueOf(AbstractStreamSqlJob.this.iterator.hasNext()));
            AbstractStreamSqlJob.LOGGER.info("Final Result: " + AbstractStreamSqlJob.this.buildResult());
            this.refreshExecutorService.shutdownNow();
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public AbstractStreamSqlJob(StreamExecutionEnvironment streamExecutionEnvironment, TableEnvironment tableEnvironment, JobManager jobManager, InterpreterContext interpreterContext, int i, FlinkShims flinkShims) {
        this.senv = streamExecutionEnvironment;
        this.stenv = tableEnvironment;
        this.jobManager = jobManager;
        this.context = interpreterContext;
        this.defaultParallelism = i;
        this.flinkShims = flinkShims;
    }

    private static TableSchema removeTimeAttributes(TableSchema tableSchema) {
        TableSchema.Builder builder = TableSchema.builder();
        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
            TypeInformation typeInformation = tableSchema.getFieldTypes()[i];
            builder.field(tableSchema.getFieldNames()[i], FlinkTypeFactory.isTimeIndicatorType(typeInformation) ? Types.SQL_TIMESTAMP : typeInformation);
        }
        return builder.build();
    }

    protected abstract String getType();

    public String run(String str) throws IOException {
        this.table = this.stenv.sqlQuery(str);
        return run(this.table, "UnnamedTable__" + SQL_INDEX.getAndIncrement());
    }

    public String run(Table table, String str) throws IOException {
        try {
            try {
                this.table = table;
                int parseInt = Integer.parseInt((String) this.context.getLocalProperties().getOrDefault("parallelism", this.defaultParallelism + ""));
                this.schema = removeTimeAttributes(table.getSchema());
                checkTableSchema(this.schema);
                LOGGER.info("ResultTable Schema: " + this.schema);
                TypeInformation rowTypeInfo = new RowTypeInfo(this.schema.getFieldTypes(), this.schema.getFieldNames());
                TypeSerializer createSerializer = Types.TUPLE(new TypeInformation[]{Types.BOOLEAN, rowTypeInfo}).createSerializer(this.senv.getConfig());
                this.iterator = new SocketStreamIterator<>(0, InetAddress.getByName(RemoteInterpreterUtils.findAvailableHostAddress()), createSerializer);
                LOGGER.debug("Collecting data at address: " + this.iterator.getBindAddress() + ":" + this.iterator.getPort());
                RetractStreamTableSink configure = ((RetractStreamTableSink) this.flinkShims.getCollectStreamTableSink(this.iterator.getBindAddress(), this.iterator.getPort(), createSerializer)).configure(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
                String currentCatalog = this.stenv.getCurrentCatalog();
                String currentDatabase = this.stenv.getCurrentDatabase();
                try {
                    this.stenv.useCatalog("default_catalog");
                    this.stenv.useDatabase("default_database");
                    this.flinkShims.registerTableSink(this.stenv, str, configure);
                    table.insertInto(str);
                    this.stenv.useCatalog(currentCatalog);
                    this.stenv.useDatabase(currentDatabase);
                    this.refreshScheduler.scheduleAtFixedRate(new RefreshTask(this.context), 1000L, Long.parseLong((String) this.context.getLocalProperties().getOrDefault("refreshInterval", "3000")), TimeUnit.MILLISECONDS);
                    ResultRetrievalThread resultRetrievalThread = new ResultRetrievalThread(this.refreshScheduler);
                    resultRetrievalThread.start();
                    LOGGER.info("Run job: " + str + ", parallelism: " + parseInt);
                    String stringLocalProperty = this.context.getStringLocalProperty("jobName", str);
                    this.stenv.execute(stringLocalProperty);
                    LOGGER.info("Flink Job is finished, jobName: " + stringLocalProperty);
                    LOGGER.info("Waiting for retrieve thread to be done");
                    resultRetrievalThread.join();
                    refresh(this.context);
                    String buildResult = buildResult();
                    LOGGER.info("Final Result: " + buildResult);
                    this.refreshScheduler.shutdownNow();
                    return buildResult;
                } catch (Throwable th) {
                    this.stenv.useCatalog(currentCatalog);
                    this.stenv.useDatabase(currentDatabase);
                    throw th;
                }
            } catch (Exception e) {
                LOGGER.error("Fail to run stream sql job", e);
                throw new IOException("Fail to run stream sql job", e);
            }
        } catch (Throwable th2) {
            this.refreshScheduler.shutdownNow();
            throw th2;
        }
    }

    protected void checkTableSchema(TableSchema tableSchema) throws Exception {
    }

    protected void processRecord(Tuple2<Boolean, Row> tuple2) {
        synchronized (this.resultLock) {
            if (((Boolean) tuple2.f0).booleanValue()) {
                processInsert((Row) tuple2.f1);
            } else {
                processDelete((Row) tuple2.f1);
            }
        }
    }

    protected abstract void processInsert(Row row);

    protected abstract void processDelete(Row row);

    protected abstract String buildResult();

    /* JADX INFO: Access modifiers changed from: protected */
    public String tableToString(List<Row> list) {
        StringBuilder sb = new StringBuilder();
        Iterator<Row> it = list.iterator();
        while (it.hasNext()) {
            sb.append((String) Arrays.stream(this.flinkShims.rowToString(it.next(), this.table, this.stenv.getConfig())).map(TableDataUtils::normalizeColumn).collect(Collectors.joining("\t")));
            sb.append("\n");
        }
        return sb.toString();
    }

    protected abstract void refresh(InterpreterContext interpreterContext) throws Exception;
}
