/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.pherf.workload;

import java.math.BigDecimal;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.Column;
import org.apache.phoenix.pherf.configuration.Scenario;
import org.apache.phoenix.pherf.configuration.WriteParams;
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.exception.PherfException;
import org.apache.phoenix.pherf.result.DataLoadThreadTime;
import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
import org.apache.phoenix.pherf.result.ResultUtil;
import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.apache.phoenix.pherf.util.RowCalculator;
import org.apache.phoenix.pherf.workload.Workload;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteWorkload
implements Workload {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteWorkload.class);
    public static final String USE_BATCH_API_PROPERTY = "pherf.default.dataloader.batchApi";
    private final PhoenixUtil pUtil;
    private final XMLConfigParser parser;
    private final RulesApplier rulesApplier;
    private final ResultUtil resultUtil;
    private final ExecutorService pool;
    private final WriteParams writeParams;
    private final Scenario scenario;
    private final long threadSleepDuration;
    private final int threadPoolSize;
    private final int batchSize;
    private final PherfConstants.GeneratePhoenixStats generateStatistics;
    private final boolean useBatchApi;

    public WriteWorkload(XMLConfigParser parser) throws Exception {
        this(PhoenixUtil.create(), parser, PherfConstants.GeneratePhoenixStats.NO);
    }

    public WriteWorkload(XMLConfigParser parser, PherfConstants.GeneratePhoenixStats generateStatistics) throws Exception {
        this(PhoenixUtil.create(), parser, generateStatistics);
    }

    public WriteWorkload(PhoenixUtil util, XMLConfigParser parser, PherfConstants.GeneratePhoenixStats generateStatistics) throws Exception {
        this(util, parser, null, generateStatistics);
    }

    public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario, PherfConstants.GeneratePhoenixStats generateStatistics) throws Exception {
        this(phoenixUtil, PherfConstants.create().getProperties("pherf.properties", true), parser, scenario, generateStatistics);
    }

    public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser, Scenario scenario, PherfConstants.GeneratePhoenixStats generateStatistics) throws Exception {
        this.pUtil = phoenixUtil;
        this.parser = parser;
        this.rulesApplier = new RulesApplier(parser);
        this.resultUtil = new ResultUtil();
        this.generateStatistics = generateStatistics;
        int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
        if (scenario != null) {
            this.scenario = scenario;
            this.writeParams = scenario.getWriteParams();
            if (this.writeParams != null) {
                this.threadSleepDuration = this.writeParams.getThreadSleepDuration();
                size = this.writeParams.getWriterThreadCount();
            } else {
                this.threadSleepDuration = 0L;
            }
        } else {
            this.writeParams = null;
            this.scenario = null;
            this.threadSleepDuration = 0L;
        }
        this.useBatchApi = Boolean.getBoolean(USE_BATCH_API_PROPERTY);
        this.threadPoolSize = size == 0 ? Runtime.getRuntime().availableProcessors() : size;
        this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
        String bSize = this.writeParams == null || this.writeParams.getBatchSize() == Long.MIN_VALUE ? properties.getProperty("pherf.default.dataloader.batchsize") : String.valueOf(this.writeParams.getBatchSize());
        this.batchSize = bSize == null ? 1000 : Integer.parseInt(bSize);
    }

    @Override
    public void complete() {
        this.pool.shutdownNow();
    }

    @Override
    public Callable<Void> execute() throws Exception {
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
                    DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
                    if (WriteWorkload.this.scenario == null) {
                        for (Scenario scenario : WriteWorkload.this.getParser().getScenarios()) {
                            WriteWorkload.this.exec(dataLoadTimeSummary, dataLoadThreadTime, scenario);
                        }
                    } else {
                        WriteWorkload.this.exec(dataLoadTimeSummary, dataLoadThreadTime, WriteWorkload.this.scenario);
                    }
                    WriteWorkload.this.resultUtil.write(dataLoadTimeSummary);
                    WriteWorkload.this.resultUtil.write(dataLoadThreadTime);
                }
                catch (Exception e) {
                    LOGGER.error("WriteWorkLoad failed", (Throwable)e);
                    throw e;
                }
                return null;
            }
        };
    }

    private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
        LOGGER.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName());
        this.pUtil.executeScenarioDdl(scenario.getPreScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary);
        List<Future<Info>> writeBatches = this.getBatches(dataLoadThreadTime, scenario);
        this.waitForBatches(dataLoadTimeSummary, scenario, EnvironmentEdgeManager.currentTimeMillis(), writeBatches);
        if (this.generateStatistics == PherfConstants.GeneratePhoenixStats.YES) {
            LOGGER.info("Updating Phoenix table statistics...");
            this.pUtil.updatePhoenixStats(scenario.getTableName(), scenario);
            LOGGER.info("Stats update done!");
        } else {
            LOGGER.info("Phoenix table stats update not requested.");
        }
        this.pUtil.executeScenarioDdl(scenario.getPostScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary);
    }

    private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
        RowCalculator rowCalculator = new RowCalculator(this.getThreadPoolSize(), scenario.getRowCount());
        ArrayList<Future<Info>> writeBatches = new ArrayList<Future<Info>>();
        for (int i = 0; i < this.getThreadPoolSize(); ++i) {
            List<Column> phxMetaCols = this.pUtil.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), this.pUtil.getConnection(scenario.getTenantId()));
            int threadRowCount = rowCalculator.getNext();
            LOGGER.info("Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
            Future<Info> write = this.upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount, dataLoadThreadTime, this.useBatchApi);
            writeBatches.add(write);
        }
        if (writeBatches.isEmpty()) {
            throw new PherfException("Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
        }
        return writeBatches;
    }

    private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario, long start, List<Future<Info>> writeBatches) throws InterruptedException, ExecutionException {
        int sumRows = 0;
        int sumDuration = 0;
        for (Future<Info> write : writeBatches) {
            Info writeInfo = write.get();
            sumRows += writeInfo.getRowCount();
            sumDuration = (int)((long)sumDuration + writeInfo.getDuration());
            LOGGER.info("Executor (" + this.hashCode() + ") writes complete with row count (" + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
        }
        long testDuration = EnvironmentEdgeManager.currentTimeMillis() - start;
        LOGGER.info("Writes completed with total row count (" + sumRows + ") with total elapsed time of (" + testDuration + ") ms and total CPU execution time of (" + sumDuration + ") ms");
        dataLoadTimeSummary.add(scenario.getTableName(), sumRows, (int)testDuration);
    }

    public Future<Info> upsertData(final Scenario scenario, final List<Column> columns, final String tableName, final int rowCount, final DataLoadThreadTime dataLoadThreadTime, final boolean useBatchApi) {
        Future<Info> future = this.pool.submit(new Callable<Info>(){

            @Override
            public Info call() throws Exception {
                int rowsCreated = 0;
                long start = 0L;
                long last = 0L;
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Connection connection = null;
                Statement stmt = null;
                try {
                    connection = WriteWorkload.this.pUtil.getConnection(scenario.getTenantId());
                    long logStartTime = EnvironmentEdgeManager.currentTimeMillis();
                    long maxDuration = WriteWorkload.this.writeParams == null ? Long.MAX_VALUE : WriteWorkload.this.writeParams.getExecutionDurationInMs();
                    int logPerNRows = 1000000;
                    String customizedLogPerNRows = connection.getClientInfo().getProperty("pherf.default.log_per_nrows");
                    if (customizedLogPerNRows != null) {
                        logPerNRows = Integer.valueOf(customizedLogPerNRows);
                    }
                    last = start = EnvironmentEdgeManager.currentTimeMillis();
                    String sql = WriteWorkload.this.buildSql(columns, tableName);
                    stmt = connection.prepareStatement(sql);
                    for (long i = (long)rowCount; i > 0L && EnvironmentEdgeManager.currentTimeMillis() - logStartTime < maxDuration; --i) {
                        stmt = WriteWorkload.this.buildStatement(scenario, columns, (PreparedStatement)stmt, simpleDateFormat);
                        if (useBatchApi) {
                            stmt.addBatch();
                        } else {
                            rowsCreated += stmt.executeUpdate();
                        }
                        if (i % (long)WriteWorkload.this.getBatchSize() != 0L) continue;
                        if (useBatchApi) {
                            int[] results = stmt.executeBatch();
                            for (int x = 0; x < results.length; ++x) {
                                int result = results[x];
                                if (result < 1) {
                                    String msg = "Failed to write update in batch (update count=" + result + ")";
                                    throw new RuntimeException(msg);
                                }
                                rowsCreated += result;
                            }
                        }
                        connection.commit();
                        long duration = EnvironmentEdgeManager.currentTimeMillis() - last;
                        LOGGER.info("Writer (" + Thread.currentThread().getName() + ") committed Batch. Total " + WriteWorkload.this.getBatchSize() + " rows for this thread (" + this.hashCode() + ") in (" + duration + ") Ms");
                        if (i % (long)logPerNRows == 0L && i != 0L) {
                            dataLoadThreadTime.add(tableName, Thread.currentThread().getName(), i, EnvironmentEdgeManager.currentTimeMillis() - logStartTime);
                        }
                        logStartTime = EnvironmentEdgeManager.currentTimeMillis();
                        Thread.sleep(WriteWorkload.this.threadSleepDuration);
                        last = EnvironmentEdgeManager.currentTimeMillis();
                    }
                }
                catch (SQLException e) {
                    LOGGER.error("Scenario " + scenario.getName() + " failed with exception ", (Throwable)e);
                    throw e;
                }
                finally {
                    if (!useBatchApi && stmt != null) {
                        stmt.close();
                    }
                    if (connection != null) {
                        if (useBatchApi && stmt != null) {
                            int[] results = stmt.executeBatch();
                            for (int x = 0; x < results.length; ++x) {
                                int result = results[x];
                                if (result < 1) {
                                    String msg = "Failed to write update in batch (update count=" + result + ")";
                                    throw new RuntimeException(msg);
                                }
                                rowsCreated += result;
                            }
                            stmt.close();
                        }
                        try {
                            connection.commit();
                            long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
                            LOGGER.info("Writer ( " + Thread.currentThread().getName() + ") committed Final Batch. Duration (" + duration + ") Ms");
                            connection.close();
                        }
                        catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                }
                long totalDuration = EnvironmentEdgeManager.currentTimeMillis() - start;
                return new Info(totalDuration, rowsCreated);
            }
        });
        return future;
    }

    private PreparedStatement buildStatement(Scenario scenario, List<Column> columns, PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
        int count = 1;
        for (Column column : columns) {
            DataValue dataValue = this.getRulesApplier().getDataForRule(scenario, column);
            switch (column.getType()) {
                case VARCHAR: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 12);
                        break;
                    }
                    statement.setString(count, dataValue.getValue());
                    break;
                }
                case CHAR: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 1);
                        break;
                    }
                    statement.setString(count, dataValue.getValue());
                    break;
                }
                case DECIMAL: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 3);
                        break;
                    }
                    statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
                    break;
                }
                case INTEGER: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 4);
                        break;
                    }
                    statement.setInt(count, Integer.parseInt(dataValue.getValue()));
                    break;
                }
                case UNSIGNED_LONG: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 1111);
                        break;
                    }
                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
                    break;
                }
                case BIGINT: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, -5);
                        break;
                    }
                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
                    break;
                }
                case TINYINT: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, -6);
                        break;
                    }
                    statement.setLong(count, Integer.parseInt(dataValue.getValue()));
                    break;
                }
                case DATE: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 91);
                        break;
                    }
                    Date date = new Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
                    statement.setDate(count, date);
                    break;
                }
                case VARCHAR_ARRAY: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 2003);
                        break;
                    }
                    Array arr = statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
                    statement.setArray(count, arr);
                    break;
                }
                case VARBINARY: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, -3);
                        break;
                    }
                    statement.setBytes(count, dataValue.getValue().getBytes());
                    break;
                }
                case TIMESTAMP: {
                    if (dataValue.getValue().equals("")) {
                        statement.setNull(count, 93);
                        break;
                    }
                    Timestamp ts = new Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
                    statement.setTimestamp(count, ts);
                    break;
                }
            }
            ++count;
        }
        return statement;
    }

    private String buildSql(List<Column> columns, String tableName) {
        StringBuilder builder = new StringBuilder();
        builder.append("upsert into ");
        builder.append(tableName);
        builder.append(" (");
        int count = 1;
        for (Column column : columns) {
            builder.append(column.getName());
            if (count < columns.size()) {
                builder.append(",");
            } else {
                builder.append(")");
            }
            ++count;
        }
        builder.append(" VALUES (");
        for (int i = 0; i < columns.size(); ++i) {
            if (i < columns.size() - 1) {
                builder.append("?,");
                continue;
            }
            builder.append("?)");
        }
        return builder.toString();
    }

    public XMLConfigParser getParser() {
        return this.parser;
    }

    public RulesApplier getRulesApplier() {
        return this.rulesApplier;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getThreadPoolSize() {
        return this.threadPoolSize;
    }

    private class Info {
        private final int rowCount;
        private final long duration;

        public Info(long duration, int rows) {
            this.duration = duration;
            this.rowCount = rows;
        }

        public long getDuration() {
            return this.duration;
        }

        public int getRowCount() {
            return this.rowCount;
        }
    }
}

