package com.hiriver.channel.stream.impl;

import com.hiriver.channel.BinlogDataSet;
import com.hiriver.channel.stream.BinlogPositionStoreTrigger;
import com.hiriver.channel.stream.BufferableBinlogDataSet;
import com.hiriver.channel.stream.ChannelBuffer;
import com.hiriver.channel.stream.ChannelStream;
import com.hiriver.channel.stream.Consumer;
import com.hiriver.channel.stream.TransactionRecognizer;
import com.hiriver.position.store.BinlogPositionStore;
import com.hiriver.streamsource.StreamSource;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.BinlogFileBinlogPosition;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.GTidBinlogPosition;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.TimestampBinlogPosition;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.ValidBinlogOutput;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.event.BaseRowEvent;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.exp.FetalParseValueExp;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.exp.ReadTimeoutExp;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.exp.TableAlreadyModifyExp;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.extra.BinlogPosition;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hiriver/channel/stream/impl/DefaultChannelStream.class */
public class DefaultChannelStream implements ChannelStream {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultChannelStream.class);
    private static final BinlogPositionStoreTrigger NONE_STORE = new BinlogPositionStoreTrigger() { // from class: com.hiriver.channel.stream.impl.DefaultChannelStream.1
        @Override // com.hiriver.channel.stream.BinlogPositionStoreTrigger
        public void triggerStoreBinlogPos() {
        }
    };
    private BinlogPosition configBinlogPos;
    private TransactionRecognizer transactionRecognizer;
    private BinlogPositionStore binlogPositionStore;
    private StreamSource streamSource;
    private Consumer consumer;
    private boolean isSkipCurrentTrans = false;
    private ChannelBuffer channelBuffer = new DefaultChannelBuffer();
    private String channelId = UUID.randomUUID().toString();
    private long faultTolerantTimeout = 5000;
    private long fetalWaitTimeout = 120000;
    private final ChannelStreamContext context = new ChannelStreamContext();

    public ChannelBuffer getChannelBuffer() {
        return this.channelBuffer;
    }

    public void setChannelBuffer(ChannelBuffer channelBuffer) {
        this.channelBuffer = channelBuffer;
    }

    public String getChannelId() {
        return this.channelId;
    }

    public void setChannelId(String str) {
        this.channelId = str;
    }

    public BinlogPosition getConfigBinlogPos() {
        return this.configBinlogPos;
    }

    public void setConfigBinlogPos(BinlogPosition binlogPosition) {
        this.configBinlogPos = binlogPosition;
    }

    public TransactionRecognizer getTransactionRecognizer() {
        return this.transactionRecognizer;
    }

    public void setTransactionRecognizer(TransactionRecognizer transactionRecognizer) {
        this.transactionRecognizer = transactionRecognizer;
    }

    public BinlogPositionStore getBinlogPositionStore() {
        return this.binlogPositionStore;
    }

    public void setBinlogPositionStore(BinlogPositionStore binlogPositionStore) {
        this.binlogPositionStore = binlogPositionStore;
    }

    public Consumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public StreamSource getStreamSource() {
        return this.streamSource;
    }

    public void setStreamSource(StreamSource streamSource) {
        this.streamSource = streamSource;
    }

    public long getFaultTolerantTimeout() {
        return this.faultTolerantTimeout;
    }

    public void setFaultTolerantTimeout(long j) {
        this.faultTolerantTimeout = j;
    }

    public long getFetalWaitTimeout() {
        return this.fetalWaitTimeout;
    }

    public void setFetalWaitTimeout(long j) {
        this.fetalWaitTimeout = j;
    }

    @Override // com.hiriver.channel.stream.ChannelStream
    @PostConstruct
    public void start() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createProviderThread(countDownLatch);
        createConsumerThread(countDownLatch2);
        try {
            countDownLatch.await(30000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        try {
            countDownLatch2.await(30000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
        }
    }

    private void createProviderThread(final CountDownLatch countDownLatch) {
        Thread thread = new Thread(new Runnable() { // from class: com.hiriver.channel.stream.impl.DefaultChannelStream.2
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                DefaultChannelStream.this.providerThreadCore();
            }
        });
        thread.setName("Provider-" + this.channelId + thread.getId());
        thread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void providerThreadCore() {
        initIfNeed();
        while (!this.context.shutDownTrigger) {
            if (ensureOpenStream()) {
                try {
                    ValidBinlogOutput readValidInfo = this.streamSource.readValidInfo();
                    if (readValidInfo != null) {
                        if (this.transactionRecognizer.isStart(readValidInfo)) {
                            ensureDispatch(new DefaultBufferableBinlogDataSet(BinlogDataSet.createStartTransEvent(this.channelId, this.streamSource.getHostUrl(), this.transactionRecognizer.getGTId(), this.transactionRecognizer.getTransBinlogPos())));
                            LOG.debug("{},start trans {}", this.channelId, this.transactionRecognizer.mo1getCurrentTransBeginPos());
                        }
                        if (this.transactionRecognizer.tryRecognizePos(readValidInfo)) {
                            LOG.debug("{},recognize pos {}", this.channelId, this.transactionRecognizer.mo1getCurrentTransBeginPos());
                        }
                        if (readValidInfo.isRowEvent()) {
                            if (this.isSkipCurrentTrans) {
                                LOG.info("{},skip row event of {}", this.channelId, this.transactionRecognizer.mo1getCurrentTransBeginPos());
                            } else {
                                LOG.debug("{},dispatch row event of {}", this.channelId, this.transactionRecognizer.mo1getCurrentTransBeginPos());
                                ensureDispatch(convert(readValidInfo));
                            }
                        } else if (this.transactionRecognizer.isEnd(readValidInfo)) {
                            this.context.setNextPos(this.transactionRecognizer.mo1getCurrentTransBeginPos());
                            if (this.isSkipCurrentTrans) {
                                LOG.info("{},skip position event of {}", this.channelId, this.transactionRecognizer.mo1getCurrentTransBeginPos());
                                this.isSkipCurrentTrans = false;
                            } else {
                                ensureDispatch(createPersistPosBufferableBinlogDataSet(this.transactionRecognizer.mo1getCurrentTransBeginPos()));
                            }
                            LOG.debug("{},end trans, {}", this.channelId, this.transactionRecognizer.mo1getCurrentTransBeginPos());
                        }
                    }
                } catch (ReadTimeoutExp e) {
                    LOG.info("channelId is {},read timeout,maybe meet network error.try to reopen.", this.channelId);
                    this.streamSource.release();
                } catch (RuntimeException e2) {
                    LOG.info("channelId is " + this.channelId + ",meet unknown error.", e2);
                    this.streamSource.release();
                } catch (TableAlreadyModifyExp | FetalParseValueExp e3) {
                    LOG.error("table has been modified or parse column error.", e3);
                    safeSleep(this.fetalWaitTimeout);
                }
            }
        }
        this.context.shutDownProviderEvent.countDown();
    }

    private void initIfNeed() {
        if (this.transactionRecognizer != null || this.configBinlogPos == null) {
            return;
        }
        if (this.configBinlogPos instanceof GTidBinlogPosition) {
            this.transactionRecognizer = new GTIDTransactionRecognizer();
        } else if (this.configBinlogPos instanceof BinlogFileBinlogPosition) {
            this.transactionRecognizer = new BinlogNameAndPosTransactionRecognizer();
        } else if (this.configBinlogPos instanceof TimestampBinlogPosition) {
            this.transactionRecognizer = new TimestampTransactionRecognizer();
        }
    }

    private void createConsumerThread(final CountDownLatch countDownLatch) {
        Thread thread = new Thread(new Runnable() { // from class: com.hiriver.channel.stream.impl.DefaultChannelStream.3
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                DefaultChannelStream.this.consumerCore();
            }
        });
        thread.setName("Consumer-" + this.channelId + thread.getId());
        thread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void consumerCore() {
        while (!this.context.shutDownTrigger) {
            BufferableBinlogDataSet pop = this.channelBuffer.pop(1000L, TimeUnit.MILLISECONDS);
            if (pop != null) {
                this.consumer.consume(pop.getBinlogDataSet(), createBinlogPositionStoreTrigger(pop));
            }
        }
        this.context.shutDownConsumerEvent.countDown();
    }

    private boolean isGtId() {
        return this.configBinlogPos instanceof GTidBinlogPosition;
    }

    private BinlogPositionStoreTrigger createBinlogPositionStoreTrigger(BufferableBinlogDataSet bufferableBinlogDataSet) {
        if (!(bufferableBinlogDataSet instanceof PersistPosBufferableBinlogDataSet)) {
            return NONE_STORE;
        }
        final PersistPosBufferableBinlogDataSet persistPosBufferableBinlogDataSet = (PersistPosBufferableBinlogDataSet) bufferableBinlogDataSet;
        return new BinlogPositionStoreTrigger() { // from class: com.hiriver.channel.stream.impl.DefaultChannelStream.4
            @Override // com.hiriver.channel.stream.BinlogPositionStoreTrigger
            public void triggerStoreBinlogPos() {
                DefaultChannelStream.this.binlogPositionStore.store(persistPosBufferableBinlogDataSet.getPos(), persistPosBufferableBinlogDataSet.getBinlogDataSet().getChannelId());
            }
        };
    }

    private void ensureDispatch(BufferableBinlogDataSet bufferableBinlogDataSet) {
        while (!this.context.shutDownTrigger && !this.channelBuffer.push(bufferableBinlogDataSet, 1000L, TimeUnit.MILLISECONDS)) {
        }
    }

    private PersistPosBufferableBinlogDataSet createPersistPosBufferableBinlogDataSet(BinlogPosition binlogPosition) {
        return new PersistPosBufferableBinlogDataSet(BinlogDataSet.createPositionStoreTrigger(this.channelId, this.streamSource.getHostUrl(), this.transactionRecognizer.getGTId(), this.transactionRecognizer.getTransBinlogPos()), binlogPosition);
    }

    private BufferableBinlogDataSet convert(ValidBinlogOutput validBinlogOutput) {
        BinlogDataSet binlogDataSet = new BinlogDataSet(this.channelId, this.streamSource.getHostUrl(), this.transactionRecognizer.getGTId(), this.transactionRecognizer.getTransBinlogPos());
        BaseRowEvent rowEvent = validBinlogOutput.getRowEvent();
        String fullTableName = rowEvent.getFullTableName();
        rowEvent.getColumnDefinitionList();
        if (!binlogDataSet.getColumnDefMap().containsKey(fullTableName)) {
            ArrayList arrayList = new ArrayList(rowEvent.getColumnDefinitionList().size());
            arrayList.addAll(rowEvent.getColumnDefinitionList());
            binlogDataSet.getColumnDefMap().put(fullTableName, arrayList);
        }
        if (binlogDataSet.getRowDataMap().containsKey(fullTableName)) {
            binlogDataSet.getRowDataMap().get(fullTableName).addAll(rowEvent.getRowList());
        } else {
            LinkedList linkedList = new LinkedList();
            linkedList.addAll(rowEvent.getRowList());
            binlogDataSet.getRowDataMap().put(fullTableName, linkedList);
        }
        return new DefaultBufferableBinlogDataSet(binlogDataSet);
    }

    private boolean ensureOpenStream() {
        if (this.streamSource.isOpen()) {
            return true;
        }
        try {
            this.streamSource.openStream(loadBinlogPosistion());
            return true;
        } catch (RuntimeException e) {
            LOG.error("ensureOpenStream error.", e);
            safeSleep(this.faultTolerantTimeout);
            return false;
        }
    }

    private void safeSleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private BinlogPosition loadBinlogPosistion() {
        boolean isGtId = isGtId();
        this.isSkipCurrentTrans = isGtId;
        if (this.context.getNextPos() != null) {
            LOG.info("load binlog position {} from mem,channelId is {}.", this.context.getNextPos().toString(), this.channelId);
            return adaptConfigPos(this.context.getNextPos());
        }
        BinlogPosition load = this.binlogPositionStore.load(this.channelId);
        if (load != null) {
            if (!isGtId && (load instanceof GTidBinlogPosition)) {
                throw new RuntimeException("stored binlogPosition is not matched.");
            }
            LOG.info("load binlog position {} from store,channelId is {}.", load.toString(), this.channelId);
            setupInitGtIdPos(load);
            return adaptConfigPos(load);
        }
        if (this.configBinlogPos == null) {
            throw new RuntimeException("can not find binlog position.");
        }
        LOG.info("load binlog position {} from configure,channelId is {}.", this.configBinlogPos.toString(), this.channelId);
        this.isSkipCurrentTrans = false;
        setupInitGtIdPos(this.configBinlogPos);
        return adaptConfigPos(this.configBinlogPos);
    }

    private BinlogPosition adaptConfigPos(BinlogPosition binlogPosition) {
        return !isGtId() ? binlogPosition : ((GTidBinlogPosition) binlogPosition).fixConfPos();
    }

    private void setupInitGtIdPos(BinlogPosition binlogPosition) {
        if (isGtId()) {
            ((GTIDTransactionRecognizer) this.transactionRecognizer).useInitPos((GTidBinlogPosition) binlogPosition);
        }
    }

    @Override // com.hiriver.channel.stream.ChannelStream
    @PreDestroy
    public void release() {
        this.context.shutDownTrigger = true;
        try {
            this.context.shutDownProviderEvent.await(30000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        try {
            this.context.shutDownConsumerEvent.await(30000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
        }
    }
}
