package org.apache.wayang.jdbc.operators;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.JsonSerializable;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.json.WayangJsonObj;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;
import org.apache.wayang.java.operators.JavaExecutionOperator;
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;

/* loaded from: input_file:org/apache/wayang/jdbc/operators/SqlToStreamOperator.class */
public class SqlToStreamOperator extends UnaryToUnaryOperator<Record, Record> implements JavaExecutionOperator, JsonSerializable {
    private final JdbcPlatformTemplate jdbcPlatform;

    /* loaded from: input_file:org/apache/wayang/jdbc/operators/SqlToStreamOperator$ResultSetIterator.class */
    private static class ResultSetIterator implements Iterator<Record>, AutoCloseable {
        private ResultSet resultSet;
        private Record next;

        ResultSetIterator(Connection connection, String str) {
            try {
                this.resultSet = connection.createStatement().executeQuery(str);
                moveToNext();
            } catch (SQLException e) {
                close();
                throw new WayangException("Could not execute SQL.", e);
            }
        }

        private void moveToNext() {
            try {
                if (this.resultSet == null || !this.resultSet.next()) {
                    this.next = null;
                    close();
                } else {
                    int columnCount = this.resultSet.getMetaData().getColumnCount();
                    Object[] objArr = new Object[columnCount];
                    for (int i = 0; i < columnCount; i++) {
                        objArr[i] = this.resultSet.getObject(i + 1);
                    }
                    this.next = new Record(objArr);
                }
            } catch (SQLException e) {
                this.next = null;
                close();
                throw new WayangException("Exception while iterating the result set.", e);
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.next != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Record next() {
            Record record = this.next;
            moveToNext();
            return record;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            try {
            } catch (Throwable th) {
                LogManager.getLogger(getClass()).error("Could not close result set.", th);
            } finally {
                this.resultSet = null;
            }
            if (this.resultSet != null) {
                this.resultSet.close();
            }
        }
    }

    public SqlToStreamOperator(JdbcPlatformTemplate jdbcPlatformTemplate) {
        this(jdbcPlatformTemplate, DataSetType.createDefault(Record.class));
    }

    public SqlToStreamOperator(JdbcPlatformTemplate jdbcPlatformTemplate, DataSetType<Record> dataSetType) {
        super(dataSetType, dataSetType, false);
        this.jdbcPlatform = jdbcPlatformTemplate;
    }

    protected SqlToStreamOperator(SqlToStreamOperator sqlToStreamOperator) {
        super(sqlToStreamOperator);
        this.jdbcPlatform = sqlToStreamOperator.jdbcPlatform;
    }

    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) {
        SqlQueryChannel.Instance instance = (SqlQueryChannel.Instance) channelInstanceArr[0];
        StreamChannel.Instance instance2 = (StreamChannel.Instance) channelInstanceArr2[0];
        instance2.accept(StreamSupport.stream(Spliterators.spliteratorUnknownSize(new ResultSetIterator(((JdbcPlatformTemplate) instance.m2getChannel().getProducer().getPlatform()).createDatabaseDescriptor(javaExecutor.getConfiguration()).createJdbcConnection(), instance.getSqlQuery()), 0), false));
        ExecutionLineageNode executionLineageNode = new ExecutionLineageNode(operatorContext);
        executionLineageNode.add(LoadProfileEstimators.createFromSpecification(String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()), javaExecutor.getConfiguration()));
        executionLineageNode.addPredecessor(instance.getLineage());
        ExecutionLineageNode executionLineageNode2 = new ExecutionLineageNode(operatorContext);
        executionLineageNode2.add(LoadProfileEstimators.createFromSpecification(String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()), javaExecutor.getConfiguration()));
        instance2.getLineage().addPredecessor(executionLineageNode2);
        return executionLineageNode.collectAndMark();
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        return Collections.singletonList(this.jdbcPlatform.getSqlQueryChannelDescriptor());
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        return Collections.singletonList(StreamChannel.DESCRIPTOR);
    }

    public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
        return Arrays.asList(String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()), String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()));
    }

    public WayangJsonObj toJson() {
        return new WayangJsonObj().put("platform", this.jdbcPlatform.getClass().getCanonicalName());
    }

    public static SqlToStreamOperator fromJson(WayangJsonObj wayangJsonObj) {
        return new SqlToStreamOperator((JdbcPlatformTemplate) ReflectionUtils.evaluate(wayangJsonObj.getString("platform") + ".getInstance()"));
    }
}
