/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.cdc.core.connector;

import io.netty.channel.Channel;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtil;
import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtil;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SocketSinkImporterConnector
implements ImporterConnector {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SocketSinkImporterConnector.class);
    private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();
    private volatile boolean incrementalTaskRunning = true;
    private final ShardingSphereDatabase database;
    private final Channel channel;
    private final int jobShardingCount;
    private final Comparator<DataRecord> dataRecordComparator;
    private final Map<String, String> tableNameSchemaMap = new HashMap<String, String>();
    private final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap = new ConcurrentHashMap<SocketSinkImporter, BlockingQueue<Record>>();
    private final AtomicInteger runningIncrementalTaskCount = new AtomicInteger(0);
    private Thread incrementalImporterTask;

    public SocketSinkImporterConnector(Channel channel, ShardingSphereDatabase database, int jobShardingCount, Collection<String> schemaTableNames, Comparator<DataRecord> dataRecordComparator) {
        this.channel = channel;
        this.database = database;
        this.jobShardingCount = jobShardingCount;
        schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
            String[] split = each.split("\\.");
            this.tableNameSchemaMap.put(split[1], split[0]);
        });
        this.dataRecordComparator = dataRecordComparator;
    }

    public Object getConnector() {
        return this.channel;
    }

    public void write(List<Record> recordList, SocketSinkImporter socketSinkImporter, ImporterType importerType) {
        if (recordList.isEmpty()) {
            return;
        }
        if (ImporterType.INVENTORY == importerType || null == this.dataRecordComparator) {
            HashMap<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<SocketSinkImporter, CDCAckPosition>();
            int dataRecordCount = (int)recordList.stream().filter(each -> each instanceof DataRecord).count();
            Record lastRecord = recordList.get(recordList.size() - 1);
            if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
                socketSinkImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
                return;
            }
            importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
            this.writeImmediately(recordList, importerDataRecordMap);
        } else if (ImporterType.INCREMENTAL == importerType) {
            this.writeIntoQueue(recordList, socketSinkImporter);
        }
    }

    private void writeImmediately(List<? extends Record> recordList, Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
        while (!this.channel.isWritable() && this.channel.isActive()) {
            this.doAwait();
        }
        if (!this.channel.isActive()) {
            return;
        }
        LinkedList<DataRecordResult.Record> records = new LinkedList<DataRecordResult.Record>();
        for (Record record : recordList) {
            if (!(record instanceof DataRecord)) continue;
            DataRecord dataRecord = (DataRecord)record;
            records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(this.database.getName(), this.tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
        }
        String ackId = CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
        DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(records).setAckId(ackId).build();
        this.channel.writeAndFlush((Object)CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
    }

    private void doAwait() {
        this.lock.lock();
        try {
            this.condition.await(200L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            this.lock.unlock();
        }
    }

    private void writeIntoQueue(List<Record> dataRecords, SocketSinkImporter socketSinkImporter) {
        BlockingQueue<Record> blockingQueue = this.incrementalRecordMap.get((Object)socketSinkImporter);
        if (null == blockingQueue) {
            log.warn("not find the queue to write");
            return;
        }
        for (Record each : dataRecords) {
            blockingQueue.put(each);
        }
    }

    public void sendIncrementalStartEvent(SocketSinkImporter socketSinkImporter, int batchSize) {
        this.incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue(batchSize));
        int count = this.runningIncrementalTaskCount.incrementAndGet();
        if (count < this.jobShardingCount || null == this.dataRecordComparator) {
            return;
        }
        log.debug("start CDC incremental importer");
        if (null == this.incrementalImporterTask) {
            this.incrementalImporterTask = new Thread(new CDCIncrementalImporterTask(batchSize));
            this.incrementalImporterTask.start();
        }
    }

    public void clean(SocketSinkImporter socketSinkImporter) {
        this.incrementalRecordMap.remove((Object)socketSinkImporter);
        if (ImporterType.INCREMENTAL == socketSinkImporter.getImporterType()) {
            this.incrementalTaskRunning = false;
        }
    }

    public String getType() {
        return CDCSinkType.SOCKET.name();
    }

    @Generated
    public void setIncrementalTaskRunning(boolean incrementalTaskRunning) {
        this.incrementalTaskRunning = incrementalTaskRunning;
    }

    private final class CDCIncrementalImporterTask
    implements Runnable {
        private final int batchSize;

        @Override
        public void run() {
            while (SocketSinkImporterConnector.this.incrementalTaskRunning) {
                DataRecord minimumDataRecord;
                HashMap<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<SocketSinkImporter, CDCAckPosition>();
                LinkedList<DataRecord> dataRecords = new LinkedList<DataRecord>();
                for (int i = 0; i < this.batchSize && null != (minimumDataRecord = CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(SocketSinkImporterConnector.this.incrementalRecordMap, SocketSinkImporterConnector.this.dataRecordComparator, cdcAckPositionMap)); ++i) {
                    dataRecords.add(minimumDataRecord);
                }
                if (dataRecords.isEmpty()) {
                    ThreadUtil.sleep((int)200, (TimeUnit)TimeUnit.MILLISECONDS);
                    continue;
                }
                SocketSinkImporterConnector.this.writeImmediately(dataRecords, cdcAckPositionMap);
            }
        }

        @Generated
        public CDCIncrementalImporterTask(int batchSize) {
            this.batchSize = batchSize;
        }
    }
}

