/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.mysql.ingest;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MySQLIncrementalDumper
extends AbstractIncrementalDumper<BinlogPosition> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MySQLIncrementalDumper.class);
    private static final Map<String, ValueHandler> VALUE_HANDLER_MAP = SingletonSPIRegistry.getSingletonInstancesMap(ValueHandler.class, ValueHandler::getTypeName);
    private final BinlogPosition binlogPosition;
    private final DumperConfiguration dumperConfig;
    private final PipelineTableMetaDataLoader metaDataLoader;
    private final Random random = new SecureRandom();
    private final PipelineChannel channel;

    public MySQLIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition<BinlogPosition> binlogPosition, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader) {
        super(dumperConfig, binlogPosition, channel, metaDataLoader);
        this.binlogPosition = (BinlogPosition)binlogPosition;
        this.dumperConfig = dumperConfig;
        Preconditions.checkArgument((boolean)(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration), (Object)"MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
        this.channel = channel;
        this.metaDataLoader = metaDataLoader;
    }

    protected void doStart() {
        this.dump();
    }

    private void dump() {
        YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration)this.dumperConfig.getDataSourceConfig()).getJdbcConfig();
        log.info("incremental dump, jdbcUrl={}", (Object)jdbcConfig.getJdbcUrl());
        DataSourceMetaData metaData = DatabaseTypeRegistry.getActualDatabaseType((String)"MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
        MySQLClient client = new MySQLClient(new ConnectInfo(this.random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
        client.connect();
        client.subscribe(this.binlogPosition.getFilename(), this.binlogPosition.getPosition());
        int eventCount = 0;
        while (this.isRunning()) {
            AbstractBinlogEvent event = client.poll();
            if (null == event) continue;
            this.handleEvent(metaData.getCatalog(), event);
            ++eventCount;
        }
        log.info("incremental dump, eventCount={}", (Object)eventCount);
        this.pushRecord((Record)new FinishedRecord((IngestPosition)new PlaceholderPosition()));
    }

    private void handleEvent(String catalog, AbstractBinlogEvent event) {
        if (event instanceof PlaceholderEvent || this.filter(catalog, (AbstractRowsEvent)event)) {
            this.createPlaceholderRecord(event);
            return;
        }
        if (event instanceof WriteRowsEvent) {
            this.handleWriteRowsEvent((WriteRowsEvent)event);
        } else if (event instanceof UpdateRowsEvent) {
            this.handleUpdateRowsEvent((UpdateRowsEvent)event);
        } else if (event instanceof DeleteRowsEvent) {
            this.handleDeleteRowsEvent((DeleteRowsEvent)event);
        }
    }

    private boolean filter(String database, AbstractRowsEvent event) {
        return !event.getSchemaName().equals(database) || !this.dumperConfig.getTableNameMap().containsKey(event.getTableName());
    }

    private void handleWriteRowsEvent(WriteRowsEvent event) {
        PipelineTableMetaData tableMetaData = this.metaDataLoader.getTableMetaData(event.getTableName());
        for (Serializable[] each : event.getAfterRows()) {
            DataRecord record = this.createDataRecord(event, each.length);
            record.setType("INSERT");
            for (int i = 0; i < each.length; ++i) {
                PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i);
                record.addColumn(new Column(columnMetaData.getName(), (Object)this.handleValue(columnMetaData, each[i]), true, columnMetaData.isPrimaryKey()));
            }
            this.pushRecord((Record)record);
        }
    }

    private void handleUpdateRowsEvent(UpdateRowsEvent event) {
        PipelineTableMetaData tableMetaData = this.metaDataLoader.getTableMetaData(event.getTableName());
        for (int i = 0; i < event.getBeforeRows().size(); ++i) {
            Serializable[] beforeValues = event.getBeforeRows().get(i);
            Serializable[] afterValues = event.getAfterRows().get(i);
            DataRecord record = this.createDataRecord(event, beforeValues.length);
            record.setType("UPDATE");
            for (int j = 0; j < beforeValues.length; ++j) {
                Serializable newValue = afterValues[j];
                Serializable oldValue = beforeValues[j];
                boolean updated = !Objects.equals(newValue, oldValue);
                PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j);
                record.addColumn(new Column(columnMetaData.getName(), (Object)(columnMetaData.isPrimaryKey() && updated ? this.handleValue(columnMetaData, oldValue) : null), (Object)this.handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
            }
            this.pushRecord((Record)record);
        }
    }

    private void handleDeleteRowsEvent(DeleteRowsEvent event) {
        PipelineTableMetaData tableMetaData = this.metaDataLoader.getTableMetaData(event.getTableName());
        for (Serializable[] each : event.getBeforeRows()) {
            DataRecord record = this.createDataRecord(event, each.length);
            record.setType("DELETE");
            int length = each.length;
            for (int i = 0; i < length; ++i) {
                PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i);
                record.addColumn(new Column(columnMetaData.getName(), (Object)this.handleValue(columnMetaData, each[i]), true, columnMetaData.isPrimaryKey()));
            }
            this.pushRecord((Record)record);
        }
    }

    private Serializable handleValue(PipelineColumnMetaData columnMetaData, Serializable value) {
        ValueHandler valueHandler = VALUE_HANDLER_MAP.get(columnMetaData.getDataTypeName());
        if (null != valueHandler) {
            return valueHandler.handle(value);
        }
        return value;
    }

    private DataRecord createDataRecord(AbstractRowsEvent rowsEvent, int columnCount) {
        DataRecord result = new DataRecord((IngestPosition)new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
        result.setTableName((String)this.dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
        result.setCommitTime(rowsEvent.getTimestamp() * 1000L);
        return result;
    }

    private void createPlaceholderRecord(AbstractBinlogEvent event) {
        PlaceholderRecord record = new PlaceholderRecord((IngestPosition)new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
        record.setCommitTime(event.getTimestamp() * 1000L);
        this.pushRecord((Record)record);
    }

    private void pushRecord(Record record) {
        this.channel.pushRecord(record);
    }

    protected void doStop() {
    }
}

