/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import oracle.sql.ROWID;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleUtils;
import org.apache.flink.cdc.connectors.oracle.util.ChunkUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleChunkSplitter
implements JdbcSourceChunkSplitter {
    private static final Logger LOG = LoggerFactory.getLogger(OracleChunkSplitter.class);
    private final JdbcSourceConfig sourceConfig;
    private final JdbcDataSourceDialect dialect;

    public OracleChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
        this.sourceConfig = sourceConfig;
        this.dialect = dialect;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
        try (JdbcConnection jdbc = this.dialect.openJdbcConnection(this.sourceConfig);){
            List<ChunkRange> chunks;
            LOG.info("Start splitting table {} into chunks...", (Object)tableId);
            long start = System.currentTimeMillis();
            Table table = this.dialect.queryTableSchema(jdbc, tableId).getTable();
            Column splitColumn = ChunkUtils.getChunkKeyColumn(table, this.sourceConfig.getChunkKeyColumn());
            try {
                chunks = this.splitTableIntoChunks(jdbc, tableId, splitColumn);
            }
            catch (SQLException e) {
                throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, (Throwable)e);
            }
            ArrayList<SnapshotSplit> splits = new ArrayList<SnapshotSplit>();
            RowType splitType = this.getSplitType(splitColumn);
            for (int i = 0; i < chunks.size(); ++i) {
                ChunkRange chunk = chunks.get(i);
                SnapshotSplit split = this.createSnapshotSplit(jdbc, tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd());
                splits.add(split);
            }
            long end = System.currentTimeMillis();
            LOG.info("Split table {} into {} chunks, time cost: {}ms.", new Object[]{tableId, splits.size(), end - start});
            ArrayList<SnapshotSplit> arrayList = splits;
            return arrayList;
        }
        catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Generate Splits for table %s error", tableId), (Throwable)e);
        }
    }

    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException {
        return OracleUtils.queryMinMax(jdbc, tableId, columnName);
    }

    public Object queryMin(JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException {
        return OracleUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
    }

    public Object queryNextChunkMax(JdbcConnection jdbc, TableId tableId, String columnName, int chunkSize, Object includedLowerBound) throws SQLException {
        return OracleUtils.queryNextChunkMax(jdbc, tableId, columnName, chunkSize, includedLowerBound);
    }

    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
        return OracleUtils.queryApproximateRowCnt(jdbc, tableId);
    }

    public String buildSplitScanQuery(TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
        return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
    }

    public DataType fromDbzColumn(Column splitColumn) {
        return OracleTypeUtils.fromDbzColumn(splitColumn);
    }

    private List<ChunkRange> splitTableIntoChunks(JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
        String splitColumnName = splitColumn.name();
        Object[] minMax = this.queryMinMax(jdbc, tableId, splitColumnName);
        Object min = minMax[0];
        Object max = minMax[1];
        if (min == null || max == null || min.equals(max)) {
            return Collections.singletonList(ChunkRange.all());
        }
        int chunkSize = this.sourceConfig.getSplitSize();
        double distributionFactorUpper = this.sourceConfig.getDistributionFactorUpper();
        double distributionFactorLower = this.sourceConfig.getDistributionFactorLower();
        if (splitColumn.name().equals(ROWID.class.getSimpleName())) {
            return this.splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
        }
        if (this.isEvenlySplitColumn(splitColumn)) {
            boolean dataIsEvenlyDistributed;
            long approximateRowCnt = this.queryApproximateRowCnt(jdbc, tableId);
            double distributionFactor = this.calculateDistributionFactor(tableId, min, max, approximateRowCnt);
            boolean bl = dataIsEvenlyDistributed = ObjectUtils.doubleCompare((double)distributionFactor, (double)distributionFactorLower) >= 0 && ObjectUtils.doubleCompare((double)distributionFactor, (double)distributionFactorUpper) <= 0;
            if (dataIsEvenlyDistributed) {
                int dynamicChunkSize = Math.max((int)(distributionFactor * (double)chunkSize), 1);
                return this.splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, dynamicChunkSize);
            }
            return this.splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
        }
        return this.splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
    }

    private List<ChunkRange> splitEvenlySizedChunks(TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) {
        LOG.info("Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}", new Object[]{tableId, approximateRowCnt, chunkSize});
        if (approximateRowCnt <= (long)chunkSize) {
            return Collections.singletonList(ChunkRange.all());
        }
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = ObjectUtils.plus((Object)min, (int)chunkSize);
        while (ObjectUtils.compare((Object)chunkEnd, (Object)max) <= 0) {
            splits.add(ChunkRange.of(chunkStart, (Object)chunkEnd));
            chunkStart = chunkEnd;
            chunkEnd = ObjectUtils.plus((Object)chunkEnd, (int)chunkSize);
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    private List<ChunkRange> splitUnevenlySizedChunks(JdbcConnection jdbc, TableId tableId, String splitColumnName, Object min, Object max, int chunkSize) throws SQLException {
        LOG.info("Use unevenly-sized chunks for table {}, the chunk size is {}", (Object)tableId, (Object)chunkSize);
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = this.nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
        int count = 0;
        while (chunkEnd != null && this.isChunkEndLeMax(chunkEnd, max)) {
            splits.add(ChunkRange.of(chunkStart, (Object)chunkEnd));
            OracleChunkSplitter.maySleep(count++, tableId);
            chunkStart = chunkEnd;
            chunkEnd = this.nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    private boolean isChunkEndLeMax(Object chunkEnd, Object max) {
        boolean chunkEndMaxCompare = chunkEnd instanceof ROWID && max instanceof ROWID ? ROWID.compareBytes((byte[])((ROWID)chunkEnd).getBytes(), (byte[])((ROWID)max).getBytes()) <= 0 : chunkEnd != null && ObjectUtils.compare((Object)chunkEnd, (Object)max) <= 0;
        return chunkEndMaxCompare;
    }

    private boolean isChunkEndGeMax(Object chunkEnd, Object max) {
        boolean chunkEndMaxCompare = chunkEnd instanceof ROWID && max instanceof ROWID ? ROWID.compareBytes((byte[])((ROWID)chunkEnd).getBytes(), (byte[])((ROWID)max).getBytes()) >= 0 : chunkEnd != null && ObjectUtils.compare((Object)chunkEnd, (Object)max) >= 0;
        return chunkEndMaxCompare;
    }

    private Object nextChunkEnd(JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, String splitColumnName, Object max, int chunkSize) throws SQLException {
        Object chunkEnd = this.queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
        if (Objects.equals(previousChunkEnd, chunkEnd)) {
            chunkEnd = this.queryMin(jdbc, tableId, splitColumnName, chunkEnd);
        }
        if (this.isChunkEndGeMax(chunkEnd, max)) {
            return null;
        }
        return chunkEnd;
    }

    private SnapshotSplit createSnapshotSplit(JdbcConnection jdbc, TableId tableId, int chunkId, RowType splitKeyType, Object chunkStart, Object chunkEnd) {
        Object[] objectArray;
        Object[] objectArray2;
        if (chunkStart == null) {
            objectArray2 = null;
        } else {
            Object[] objectArray3 = new Object[1];
            objectArray2 = objectArray3;
            objectArray3[0] = chunkStart;
        }
        Object[] splitStart = objectArray2;
        if (chunkEnd == null) {
            objectArray = null;
        } else {
            Object[] objectArray4 = new Object[1];
            objectArray = objectArray4;
            objectArray4[0] = chunkEnd;
        }
        Object[] splitEnd = objectArray;
        HashMap<TableId, TableChanges.TableChange> schema = new HashMap<TableId, TableChanges.TableChange>();
        schema.put(tableId, this.dialect.queryTableSchema(jdbc, tableId));
        return new SnapshotSplit(tableId, OracleChunkSplitter.splitId(tableId, chunkId), splitKeyType, splitStart, splitEnd, null, schema);
    }

    private double calculateDistributionFactor(TableId tableId, Object min, Object max, long approximateRowCnt) {
        if (!min.getClass().equals(max.getClass())) {
            throw new IllegalStateException(String.format("Unsupported operation type, the MIN value type %s is different with MAX value type %s.", min.getClass().getSimpleName(), max.getClass().getSimpleName()));
        }
        if (approximateRowCnt == 0L) {
            return Double.MAX_VALUE;
        }
        BigDecimal difference = ObjectUtils.minus((Object)max, (Object)min);
        BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1L));
        double distributionFactor = subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 2).doubleValue();
        LOG.info("The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", new Object[]{tableId, distributionFactor, min, max, approximateRowCnt});
        return distributionFactor;
    }

    private static String splitId(TableId tableId, int chunkId) {
        return tableId.toString() + ":" + chunkId;
    }

    private static void maySleep(int count, TableId tableId) {
        if (count % 10 == 0) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", (Object)count, (Object)tableId);
        }
    }
}

