/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.operator;

import cn.tenmg.dsl.NamedScript;
import cn.tenmg.dsl.ParamsParser;
import cn.tenmg.dsl.Script;
import cn.tenmg.dsl.parser.JDBCParamsParser;
import cn.tenmg.dsl.utils.DSLUtils;
import cn.tenmg.dsl.utils.StringUtils;
import cn.tenmg.flink.jobs.context.FlinkJobsContext;
import cn.tenmg.flink.jobs.jdbc.SQLExecutor;
import cn.tenmg.flink.jobs.jdbc.executor.ExecuteLargeUpdateSQLExecutor;
import cn.tenmg.flink.jobs.jdbc.executor.ExecuteSQLExecutor;
import cn.tenmg.flink.jobs.jdbc.executor.ExecuteUpdateSQLExecutor;
import cn.tenmg.flink.jobs.jdbc.executor.GetSQLExecutor;
import cn.tenmg.flink.jobs.jdbc.executor.ReadOnlySQLExecutor;
import cn.tenmg.flink.jobs.jdbc.executor.SelectSQLExecutor;
import cn.tenmg.flink.jobs.model.Jdbc;
import cn.tenmg.flink.jobs.operator.AbstractOperator;
import cn.tenmg.flink.jobs.utils.JDBCUtils;
import cn.tenmg.flink.jobs.utils.JSONUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcOperator
extends AbstractOperator<Jdbc> {
    private static Logger log = LoggerFactory.getLogger(JdbcOperator.class);
    private static Map<String, SQLExecutor<?>> sqlExecuters = new HashMap<String, SQLExecutor<?>>(){
        private static final long serialVersionUID = 2696116935428505003L;
        {
            this.put("executeLargeUpdate", ExecuteLargeUpdateSQLExecutor.getInstance());
            this.put("executeUpdate", ExecuteUpdateSQLExecutor.getInstance());
            this.put("execute", ExecuteSQLExecutor.getInstance());
        }
    };
    private static Map<String, ReadOnlySQLExecutorInfo> readOnlySQLExecutors = new HashMap<String, ReadOnlySQLExecutorInfo>(){
        private static final long serialVersionUID = 6192431462976890302L;
        {
            this.put("get", new ReadOnlySQLExecutorInfo(GetSQLExecutor.class, Object.class));
            this.put("select", new ReadOnlySQLExecutorInfo(SelectSQLExecutor.class, HashMap.class));
        }
    };
    private static Set<String> sqlExecuterKeys = new HashSet<String>(){
        private static final long serialVersionUID = 2825056328562857566L;
        {
            this.addAll(sqlExecuters.keySet());
            this.addAll(readOnlySQLExecutors.keySet());
        }
    };

    @Override
    public Object execute(StreamExecutionEnvironment env, Jdbc jdbc, Map<String, Object> params) throws Exception {
        NamedScript namedScript = DSLUtils.parse((String)jdbc.getScript(), params);
        String datasource = jdbc.getDataSource();
        String script = namedScript.getScript();
        Map usedParams = namedScript.getParams();
        Script sql = DSLUtils.toScript((String)script, (Map)usedParams, (ParamsParser)JDBCParamsParser.getInstance());
        if (StringUtils.isNotBlank((String)datasource)) {
            SQLExecutor<?> executer;
            log.info(String.format("Execute JDBC SQL: %s; parameters: %s", script, JSONUtils.toJSONString(usedParams)));
            String method = jdbc.getMethod();
            if (!sqlExecuterKeys.contains(method)) {
                method = FlinkJobsContext.getProperty("jdbc.default_method", "execute");
            }
            if ((executer = sqlExecuters.get(method)) == null) {
                executer = this.getReadOnlySQLExecutor(method, jdbc.getResultClass());
            }
            return this.execute(datasource, sql.getValue(), (List)sql.getParams(), executer);
        }
        throw new IllegalArgumentException("dataSource must be not null");
    }

    private ReadOnlySQLExecutor<?> getReadOnlySQLExecutor(String method, String resultClass) throws Exception {
        ReadOnlySQLExecutorInfo readOnlySQLExecutorInfo = readOnlySQLExecutors.get(method);
        Class<?> type = StringUtils.isBlank((String)resultClass) ? readOnlySQLExecutorInfo.getDefaultResultClass() : Class.forName(resultClass);
        return readOnlySQLExecutorInfo.getExecutorClass().getConstructor(Class.class).newInstance(type);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T execute(String datasource, String sql, List<Object> params, SQLExecutor<T> sqlExecuter) throws SQLException, ClassNotFoundException {
        Connection con = null;
        T result = null;
        try {
            con = JDBCUtils.getConnection(FlinkJobsContext.getDatasource(datasource));
            con.setAutoCommit(true);
            con.setReadOnly(sqlExecuter.isReadOnly());
            result = JdbcOperator.execute(con, sql, params, sqlExecuter);
        }
        finally {
            JDBCUtils.close(con);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static <T> T execute(Connection con, String sql, List<Object> params, SQLExecutor<T> sqlExecuter) throws SQLException {
        T t;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            ps = con.prepareStatement(sql);
            JDBCUtils.setParams(ps, params);
            rs = sqlExecuter.executeQuery(ps);
            t = sqlExecuter.execute(ps, rs);
        }
        catch (Throwable throwable) {
            JDBCUtils.close(rs);
            JDBCUtils.close(ps);
            throw throwable;
        }
        JDBCUtils.close(rs);
        JDBCUtils.close(ps);
        return t;
    }

    private static class ReadOnlySQLExecutorInfo {
        private Class<? extends ReadOnlySQLExecutor> executorClass;
        private Class<?> defaultResultClass;

        public Class<? extends ReadOnlySQLExecutor> getExecutorClass() {
            return this.executorClass;
        }

        public Class<?> getDefaultResultClass() {
            return this.defaultResultClass;
        }

        public ReadOnlySQLExecutorInfo(Class<? extends ReadOnlySQLExecutor> executorClass, Class<?> defaultResultClass) {
            this.executorClass = executorClass;
            this.defaultResultClass = defaultResultClass;
        }
    }
}

