package org.apache.wayang.jdbc.execution;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.operators.TableSource;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ExecutionState;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.ExecutorTemplate;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
import org.apache.wayang.jdbc.operators.JdbcExecutionOperator;
import org.apache.wayang.jdbc.operators.JdbcFilterOperator;
import org.apache.wayang.jdbc.operators.JdbcProjectionOperator;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;

/* loaded from: input_file:org/apache/wayang/jdbc/execution/JdbcExecutor.class */
public class JdbcExecutor extends ExecutorTemplate {
    private final JdbcPlatformTemplate platform;
    private final Connection connection;
    private final Logger logger;
    private final FunctionCompiler functionCompiler;
    static final /* synthetic */ boolean $assertionsDisabled;

    public JdbcExecutor(JdbcPlatformTemplate jdbcPlatformTemplate, Job job) {
        super(job.getCrossPlatformExecutor());
        this.logger = LogManager.getLogger(getClass());
        this.functionCompiler = new FunctionCompiler();
        this.platform = jdbcPlatformTemplate;
        this.connection = this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection();
    }

    public void execute(ExecutionStage executionStage, OptimizationContext optimizationContext, ExecutionState executionState) {
        Collection startTasks = executionStage.getStartTasks();
        Collection terminalTasks = executionStage.getTerminalTasks();
        if (!$assertionsDisabled && startTasks.size() != 1) {
            throw new AssertionError("Invalid jdbc stage: multiple sources are not currently supported");
        }
        ExecutionTask executionTask = (ExecutionTask) startTasks.toArray()[0];
        if (!$assertionsDisabled && terminalTasks.size() != 1) {
            throw new AssertionError("Invalid JDBC stage: multiple terminal tasks are not currently supported.");
        }
        if (!$assertionsDisabled && !(executionTask.getOperator() instanceof TableSource)) {
            throw new AssertionError("Invalid JDBC stage: Start task has to be a TableSource");
        }
        TableSource operator = executionTask.getOperator();
        SqlQueryChannel.Instance instantiateOutboundChannel = instantiateOutboundChannel(executionTask, optimizationContext);
        ArrayList arrayList = new ArrayList(4);
        ExecutionTask executionTask2 = null;
        Set allTasks = executionStage.getAllTasks();
        if (!$assertionsDisabled && allTasks.size() > 3) {
            throw new AssertionError();
        }
        ExecutionTask findJdbcExecutionOperatorTaskInStage = findJdbcExecutionOperatorTaskInStage(executionTask, executionStage);
        while (true) {
            ExecutionTask executionTask3 = findJdbcExecutionOperatorTaskInStage;
            if (executionTask3 == null) {
                instantiateOutboundChannel.setSqlQuery(createSqlQuery(getSqlClause(operator), (Collection) arrayList.stream().map((v0) -> {
                    return v0.getOperator();
                }).map((v1) -> {
                    return getSqlClause(v1);
                }).collect(Collectors.toList()), executionTask2 == null ? "*" : getSqlClause(executionTask2.getOperator())));
                executionState.register(instantiateOutboundChannel);
                return;
            }
            if (executionTask3.getOperator() instanceof JdbcFilterOperator) {
                arrayList.add(executionTask3);
            } else {
                if (!(executionTask3.getOperator() instanceof JdbcProjectionOperator)) {
                    throw new WayangException(String.format("Unsupported JDBC execution task %s", executionTask3.toString()));
                }
                if (!$assertionsDisabled && executionTask2 != null) {
                    throw new AssertionError();
                }
                executionTask2 = executionTask3;
            }
            instantiateOutboundChannel = instantiateOutboundChannel(executionTask3, optimizationContext, instantiateOutboundChannel);
            findJdbcExecutionOperatorTaskInStage = findJdbcExecutionOperatorTaskInStage(executionTask3, executionStage);
        }
    }

    private ExecutionTask findJdbcExecutionOperatorTaskInStage(ExecutionTask executionTask, ExecutionStage executionStage) {
        if (!$assertionsDisabled && executionTask.getNumOuputChannels() != 1) {
            throw new AssertionError();
        }
        ExecutionTask executionTask2 = (ExecutionTask) WayangCollections.getSingle(executionTask.getOutputChannel(0).getConsumers());
        if (executionTask2.getStage() == executionStage && (executionTask2.getOperator() instanceof JdbcExecutionOperator)) {
            return executionTask2;
        }
        return null;
    }

    private SqlQueryChannel.Instance instantiateOutboundChannel(ExecutionTask executionTask, OptimizationContext optimizationContext) {
        if (!$assertionsDisabled && executionTask.getNumOuputChannels() != 1) {
            throw new AssertionError(String.format("Illegal task: %s.", executionTask));
        }
        if ($assertionsDisabled || (executionTask.getOutputChannel(0) instanceof SqlQueryChannel)) {
            return ((SqlQueryChannel) executionTask.getOutputChannel(0)).m0createInstance((Executor) this, optimizationContext.getOperatorContext(executionTask.getOperator()), 0);
        }
        throw new AssertionError(String.format("Illegal task: %s.", executionTask));
    }

    private SqlQueryChannel.Instance instantiateOutboundChannel(ExecutionTask executionTask, OptimizationContext optimizationContext, SqlQueryChannel.Instance instance) {
        SqlQueryChannel.Instance instantiateOutboundChannel = instantiateOutboundChannel(executionTask, optimizationContext);
        instantiateOutboundChannel.getLineage().addPredecessor(instance.getLineage());
        return instantiateOutboundChannel;
    }

    protected String createSqlQuery(String str, Collection<String> collection, String str2) {
        StringBuilder sb = new StringBuilder(1000);
        sb.append("SELECT ").append(str2).append(" FROM ").append(str);
        if (!collection.isEmpty()) {
            sb.append(" WHERE ");
            String str3 = "";
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                sb.append(str3).append(it.next());
                str3 = " AND ";
            }
        }
        sb.append(';');
        return sb.toString();
    }

    private String getSqlClause(Operator operator) {
        return ((JdbcExecutionOperator) operator).createSqlClause(this.connection, this.functionCompiler);
    }

    public void dispose() {
        try {
            this.connection.close();
        } catch (SQLException e) {
            this.logger.error("Could not close JDBC connection to PostgreSQL correctly.", e);
        }
    }

    public Platform getPlatform() {
        return this.platform;
    }

    private void saveResult(FileChannel.Instance instance, ResultSet resultSet) throws IOException, SQLException {
        try {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(((FileSystem) FileSystems.getFileSystem(instance.getSinglePath()).get()).create(instance.getSinglePath()));
            Throwable th = null;
            while (resultSet.next()) {
                try {
                    try {
                        ResultSetMetaData metaData = resultSet.getMetaData();
                        for (int i = 1; i <= metaData.getColumnCount(); i++) {
                            outputStreamWriter.write(resultSet.getString(i));
                            if (i < metaData.getColumnCount()) {
                                outputStreamWriter.write(9);
                            }
                        }
                        if (!resultSet.isLast()) {
                            outputStreamWriter.write(10);
                        }
                    } finally {
                    }
                } finally {
                }
            }
            if (outputStreamWriter != null) {
                if (0 != 0) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
        } catch (UncheckedIOException e) {
            throw e.getCause();
        }
    }

    static {
        $assertionsDisabled = !JdbcExecutor.class.desiredAssertionStatus();
    }
}
