package com.hiriver.streamsource.impl;

import com.hiriver.streamsource.DbHostInfo;
import com.hiriver.streamsource.DbHostInfoSupplier;
import com.hiriver.streamsource.StreamSource;
import com.hiriver.unbiz.mysql.lib.BinlogStreamBlockingTransport;
import com.hiriver.unbiz.mysql.lib.BinlogStreamBlockingTransportImpl;
import com.hiriver.unbiz.mysql.lib.TextProtocolBlockingTransport;
import com.hiriver.unbiz.mysql.lib.TextProtocolBlockingTransportImpl;
import com.hiriver.unbiz.mysql.lib.TransportConfig;
import com.hiriver.unbiz.mysql.lib.filter.TableFilter;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.BinlogEvent;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.BinlogFileBinlogPosition;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.TimestampBinlogPosition;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.ValidBinlogOutput;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.exp.ReadTimeoutExp;
import com.hiriver.unbiz.mysql.lib.protocol.binlog.extra.BinlogPosition;
import com.hiriver.unbiz.mysql.lib.protocol.text.ColumnValue;
import com.hiriver.unbiz.mysql.lib.protocol.text.ResultsetRowResponse;
import com.hiriverunbiz.mysql.lib.exp.NotExpectPayloadException;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hiriver/streamsource/impl/TimestampBasedStreamSource.class */
public class TimestampBasedStreamSource implements StreamSource, DbHostInfo {
    private static final Logger LOGGER = LoggerFactory.getLogger(TimestampBasedStreamSource.class);
    private static final int VALID_BINLOG_START_OFFSET = 4;
    private DbHostInfoSupplier dbHostInfoSupplier;
    private int serverId;
    private TableFilter tableFilter;
    private volatile DbHostAndTransport currentDb;
    private TransportConfig transportConfig = new TransportConfig();
    private int maxMaxPacketSize = 0;
    private int advanceSecondsToAvoidMistake = (int) TimeUnit.MINUTES.toSeconds(5);
    private volatile boolean open = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hiriver/streamsource/impl/TimestampBasedStreamSource$DbHostAndTransport.class */
    public class DbHostAndTransport implements Closeable {
        private final DbHostInfo dbHostInfo;
        private final String serverUuid = fetchUuid();
        private final BinlogStreamBlockingTransport transport;

        DbHostAndTransport(DbHostInfo dbHostInfo, BinlogStreamBlockingTransport binlogStreamBlockingTransport) {
            this.dbHostInfo = dbHostInfo;
            this.transport = binlogStreamBlockingTransport;
        }

        private String fetchUuid() {
            String[] split = this.dbHostInfo.getHostUrl().split(":");
            TextProtocolBlockingTransportImpl textProtocolBlockingTransportImpl = new TextProtocolBlockingTransportImpl(split[0], Integer.parseInt(split[1]), this.dbHostInfo.getUserName(), this.dbHostInfo.getPassword());
            try {
                textProtocolBlockingTransportImpl.open();
                for (ColumnValue columnValue : ((ResultsetRowResponse) textProtocolBlockingTransportImpl.execute("show variables like \"server_uuid\"").getRowList().get(0)).getValueList()) {
                    if (StringUtils.equals("Value", columnValue.getColumnName())) {
                        return columnValue.getValueAsString();
                    }
                }
                try {
                    textProtocolBlockingTransportImpl.close();
                } catch (Exception e) {
                }
                throw new IllegalStateException("fetch uuid fail");
            } finally {
                try {
                    textProtocolBlockingTransportImpl.close();
                } catch (Exception e2) {
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.transport != null) {
                try {
                    this.transport.close();
                } catch (Exception e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hiriver/streamsource/impl/TimestampBasedStreamSource$DbTarget.class */
    public static class DbTarget implements DbHostInfo {
        private String host;
        private int port;
        private String userName;
        private String password;

        public DbTarget(DbHostInfo dbHostInfo) {
            String[] split = dbHostInfo.getHostUrl().split(":");
            this.port = Integer.parseInt(split[1]);
            this.host = split[0];
            this.userName = dbHostInfo.getUserName();
            this.password = dbHostInfo.getPassword();
        }

        @Override // com.hiriver.streamsource.DbHostInfo
        public String getHostUrl() {
            return this.host + ":" + this.port;
        }

        @Override // com.hiriver.streamsource.DbHostInfo
        public String getUserName() {
            return this.userName;
        }

        @Override // com.hiriver.streamsource.DbHostInfo
        public String getPassword() {
            return this.password;
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hiriver/streamsource/impl/TimestampBasedStreamSource$ExtractValidBeginTimestampFunction.class */
    public interface ExtractValidBeginTimestampFunction {
        long extract(String str);
    }

    @Override // com.hiriver.streamsource.StreamSource
    public void openStream(BinlogPosition binlogPosition) {
        assertState(binlogPosition instanceof TimestampBinlogPosition, "binlogPos类型非TimestampBinlogPosition，" + binlogPosition);
        assertState(!this.open, "already opened");
        DbHostInfo available = this.dbHostInfoSupplier.available();
        assertState(available != null, "no available dbHostInfo");
        release();
        DbTarget dbTarget = new DbTarget(available);
        this.currentDb = new DbHostAndTransport(available, initTransportByDbHostInfo(dbTarget));
        this.currentDb.transport.dump(findRealPosition((TimestampBinlogPosition) binlogPosition, dbTarget, this.currentDb.serverUuid, this.advanceSecondsToAvoidMistake));
        this.open = true;
    }

    @Override // com.hiriver.streamsource.StreamSource
    public ValidBinlogOutput readValidInfo() throws ReadTimeoutExp {
        ValidBinlogOutput binlogOutputImmediately = this.currentDb.transport.getBinlogOutputImmediately();
        if (binlogOutputImmediately != null) {
            binlogOutputImmediately.setServerUuid(this.currentDb.serverUuid);
        }
        return binlogOutputImmediately;
    }

    private static TimestampBinlogPosition findRealPosition(TimestampBinlogPosition timestampBinlogPosition, final DbTarget dbTarget, String str, long j) {
        assertState(timestampBinlogPosition.getTimestamp() <= System.currentTimeMillis() / 1000, "时间位点不可晚于当前时间");
        assertState(j > 0, "前拨秒数不可设置为0");
        if (StringUtils.equalsIgnoreCase(timestampBinlogPosition.getServerUuid(), str)) {
            return timestampBinlogPosition;
        }
        long timestamp = timestampBinlogPosition.getTimestamp() - j;
        LOGGER.info("target timestamp position to find:{}", Long.valueOf(timestamp));
        final TextProtocolBlockingTransportImpl textProtocolBlockingTransportImpl = new TextProtocolBlockingTransportImpl(dbTarget.getHost(), dbTarget.getPort(), dbTarget.getUserName(), dbTarget.getPassword());
        try {
            textProtocolBlockingTransportImpl.open();
            BinlogFileBinlogPosition endBinlogFilePos = getEndBinlogFilePos(textProtocolBlockingTransportImpl);
            LOGGER.info("current binlog position end:{}", endBinlogFilePos);
            String findLastFileStartLeTimestamp = findLastFileStartLeTimestamp(timestamp, getAllBinaryLogNames(textProtocolBlockingTransportImpl), new ExtractValidBeginTimestampFunction() { // from class: com.hiriver.streamsource.impl.TimestampBasedStreamSource.1
                @Override // com.hiriver.streamsource.impl.TimestampBasedStreamSource.ExtractValidBeginTimestampFunction
                public long extract(String str2) {
                    return TimestampBasedStreamSource.getTimestampOfFirstEvent(DbTarget.this, textProtocolBlockingTransportImpl, str2);
                }
            });
            if (findLastFileStartLeTimestamp == null) {
                throw new IllegalArgumentException("目标时间戳早于binlog中最早的事件，可能会导致消费binlog事件丢失");
            }
            TimestampBinlogPosition findPositionLeTargetTimestampInOneBinlogFile = findPositionLeTargetTimestampInOneBinlogFile(dbTarget, str, timestamp, textProtocolBlockingTransportImpl, endBinlogFilePos, findLastFileStartLeTimestamp);
            LOGGER.info("findRealPosition resultPosition:{} for targetTimestamp:{}", findPositionLeTargetTimestampInOneBinlogFile, Long.valueOf(timestamp));
            return findPositionLeTargetTimestampInOneBinlogFile;
        } finally {
            try {
                textProtocolBlockingTransportImpl.close();
            } catch (Exception e) {
            }
        }
    }

    private static TimestampBinlogPosition findPositionLeTargetTimestampInOneBinlogFile(DbTarget dbTarget, String str, long j, TextProtocolBlockingTransport textProtocolBlockingTransport, BinlogFileBinlogPosition binlogFileBinlogPosition, String str2) {
        BinlogStreamBlockingTransportImpl binlogStreamBlockingTransportImpl = new BinlogStreamBlockingTransportImpl(dbTarget.getHost(), dbTarget.getPort(), dbTarget.getUserName(), dbTarget.getPassword());
        binlogStreamBlockingTransportImpl.setTableFilter(new TableFilter() { // from class: com.hiriver.streamsource.impl.TimestampBasedStreamSource.2
            public boolean filter(String str3, String str4) {
                return false;
            }
        });
        try {
            binlogStreamBlockingTransportImpl.dump(new BinlogFileBinlogPosition(str2, 4L));
            Long l = null;
            String str3 = null;
            Long l2 = null;
            while (true) {
                BinlogEvent readWithoutSpecialEvent = binlogStreamBlockingTransportImpl.readWithoutSpecialEvent();
                if (readWithoutSpecialEvent != null && readWithoutSpecialEvent.getOccurTime() > 0 && readWithoutSpecialEvent.getBinlogEventPos() > 0) {
                    long occurTime = readWithoutSpecialEvent.getOccurTime();
                    if (l != null && occurTime > j) {
                        break;
                    }
                    l = Long.valueOf(occurTime);
                    str3 = binlogStreamBlockingTransportImpl.currentBinlogFile();
                    l2 = Long.valueOf(readWithoutSpecialEvent.getBinlogEventPos());
                    if (le(binlogFileBinlogPosition.getBinlogFileName(), binlogFileBinlogPosition.getPos(), str3, l2.longValue())) {
                        LOGGER.info("result pos {}:{} behind/equals end position:{}", new Object[]{str3, l2, binlogFileBinlogPosition});
                        break;
                    }
                }
            }
            TimestampBinlogPosition timestampBinlogPosition = new TimestampBinlogPosition(l.longValue(), str, str3, l2);
            closeBinlogDump(textProtocolBlockingTransport, binlogStreamBlockingTransportImpl);
            return timestampBinlogPosition;
        } catch (Throwable th) {
            closeBinlogDump(textProtocolBlockingTransport, binlogStreamBlockingTransportImpl);
            throw th;
        }
    }

    static String findLastFileStartLeTimestamp(long j, List<String> list, ExtractValidBeginTimestampFunction extractValidBeginTimestampFunction) {
        int size = list.size() - 1;
        if (extractValidBeginTimestampFunction.extract(list.get(size)) <= j) {
            return list.get(size);
        }
        if (size > 0 && j + TimeUnit.HOURS.toSeconds(3L) >= System.currentTimeMillis()) {
            size--;
            if (extractValidBeginTimestampFunction.extract(list.get(size)) <= j) {
                return list.get(size);
            }
        }
        int i = 0;
        if (size == 0 || extractValidBeginTimestampFunction.extract(list.get(0)) > j) {
            return null;
        }
        while (true) {
            int i2 = (i + size) / 2;
            if (i2 <= i) {
                return list.get(i2);
            }
            if (extractValidBeginTimestampFunction.extract(list.get(i2)) <= j) {
                i = i2;
            } else {
                size = i2;
            }
        }
    }

    private BinlogStreamBlockingTransportImpl initTransportByDbHostInfo(DbTarget dbTarget) {
        BinlogStreamBlockingTransportImpl binlogStreamBlockingTransportImpl = new BinlogStreamBlockingTransportImpl(dbTarget.getHost(), dbTarget.getPort(), dbTarget.getUserName(), dbTarget.getPassword());
        binlogStreamBlockingTransportImpl.setServerId(getServerId());
        binlogStreamBlockingTransportImpl.setTransportConfig(getTransportConfig());
        binlogStreamBlockingTransportImpl.setTableFilter(getTableFilter());
        binlogStreamBlockingTransportImpl.setMaxMaxPacketSize(getMaxMaxPacketSize());
        return binlogStreamBlockingTransportImpl;
    }

    private static BinlogFileBinlogPosition getEndBinlogFilePos(TextProtocolBlockingTransport textProtocolBlockingTransport) {
        String str = null;
        Long l = null;
        for (ColumnValue columnValue : ((ResultsetRowResponse) textProtocolBlockingTransport.execute("show master status").getRowList().get(0)).getValueList()) {
            if (StringUtils.equalsIgnoreCase("File", columnValue.getColumnName())) {
                str = columnValue.getValueAsString();
            } else if (StringUtils.equalsIgnoreCase("Position", columnValue.getColumnName())) {
                l = columnValue.getValueAsLong();
            }
        }
        assertState(str != null, "null lastFileName");
        assertState(l != null, "null lastPos");
        return new BinlogFileBinlogPosition(str, l.longValue());
    }

    private static List<String> getAllBinaryLogNames(TextProtocolBlockingTransport textProtocolBlockingTransport) {
        ArrayList arrayList = new ArrayList();
        Iterator it = textProtocolBlockingTransport.execute("show binary logs").getRowList().iterator();
        while (it.hasNext()) {
            for (ColumnValue columnValue : ((ResultsetRowResponse) it.next()).getValueList()) {
                if (StringUtils.equals("Log_name", columnValue.getColumnName())) {
                    arrayList.add(columnValue.getValueAsString());
                }
            }
        }
        assertState(arrayList.size() > 0, "empty binaryLogFiles");
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long getTimestampOfFirstEvent(DbTarget dbTarget, TextProtocolBlockingTransport textProtocolBlockingTransport, String str) {
        BinlogStreamBlockingTransportImpl binlogStreamBlockingTransportImpl = new BinlogStreamBlockingTransportImpl(dbTarget.getHost(), dbTarget.getPort(), dbTarget.getUserName(), dbTarget.getPassword());
        binlogStreamBlockingTransportImpl.setTableFilter(new TableFilter() { // from class: com.hiriver.streamsource.impl.TimestampBasedStreamSource.3
            public boolean filter(String str2, String str3) {
                return false;
            }
        });
        try {
            binlogStreamBlockingTransportImpl.dump(new BinlogFileBinlogPosition(str, 4L));
            while (true) {
                BinlogEvent readWithoutSpecialEvent = binlogStreamBlockingTransportImpl.readWithoutSpecialEvent();
                if (readWithoutSpecialEvent != null && readWithoutSpecialEvent.getOccurTime() > 0) {
                    LOGGER.info("binlogFile:{}, first event timestamp:{}", str, Long.valueOf(readWithoutSpecialEvent.getOccurTime()));
                    long occurTime = readWithoutSpecialEvent.getOccurTime();
                    closeBinlogDump(textProtocolBlockingTransport, binlogStreamBlockingTransportImpl);
                    return occurTime;
                }
            }
        } catch (Throwable th) {
            closeBinlogDump(textProtocolBlockingTransport, binlogStreamBlockingTransportImpl);
            throw th;
        }
    }

    private static void closeBinlogDump(TextProtocolBlockingTransport textProtocolBlockingTransport, BinlogStreamBlockingTransportImpl binlogStreamBlockingTransportImpl) {
        Long mysqlConnectionId = binlogStreamBlockingTransportImpl.mysqlConnectionId();
        binlogStreamBlockingTransportImpl.close();
        if (mysqlConnectionId != null) {
            try {
                textProtocolBlockingTransport.execute("KILL CONNECTION " + mysqlConnectionId);
            } catch (NotExpectPayloadException e) {
            }
        }
    }

    private static void assertState(boolean z, String str) {
        if (!z) {
            throw new IllegalStateException(str);
        }
    }

    private static boolean le(String str, long j, String str2, long j2) {
        int parseInt = Integer.parseInt(StringUtils.substringAfterLast(str, "."));
        int parseInt2 = Integer.parseInt(StringUtils.substringAfterLast(str2, "."));
        if (parseInt < parseInt2) {
            return true;
        }
        return parseInt <= parseInt2 && j <= j2;
    }

    @Override // com.hiriver.streamsource.StreamSource, com.hiriver.streamsource.DbHostInfo
    public String getHostUrl() {
        if (this.currentDb == null) {
            return null;
        }
        return this.currentDb.dbHostInfo.getHostUrl();
    }

    @Override // com.hiriver.streamsource.DbHostInfo
    public String getUserName() {
        if (this.currentDb == null) {
            return null;
        }
        return this.currentDb.dbHostInfo.getUserName();
    }

    @Override // com.hiriver.streamsource.DbHostInfo
    public String getPassword() {
        if (this.currentDb == null) {
            return null;
        }
        return this.currentDb.dbHostInfo.getPassword();
    }

    @Override // com.hiriver.streamsource.StreamSource
    public void release() {
        if (this.currentDb != null) {
            this.currentDb.close();
            this.currentDb = null;
        }
        this.open = false;
    }

    @Override // com.hiriver.streamsource.StreamSource
    public boolean isOpen() {
        return this.open;
    }

    public TransportConfig getTransportConfig() {
        return this.transportConfig;
    }

    public void setTransportConfig(TransportConfig transportConfig) {
        this.transportConfig = transportConfig;
    }

    public int getServerId() {
        return this.serverId;
    }

    public void setServerId(int i) {
        this.serverId = i;
    }

    public int getMaxMaxPacketSize() {
        return this.maxMaxPacketSize;
    }

    public void setMaxMaxPacketSize(int i) {
        this.maxMaxPacketSize = i;
    }

    public TableFilter getTableFilter() {
        return this.tableFilter;
    }

    public void setTableFilter(TableFilter tableFilter) {
        this.tableFilter = tableFilter;
    }

    public DbHostInfoSupplier getDbHostInfoSupplier() {
        return this.dbHostInfoSupplier;
    }

    public void setDbHostInfoSupplier(DbHostInfoSupplier dbHostInfoSupplier) {
        this.dbHostInfoSupplier = dbHostInfoSupplier;
    }

    public int getAdvanceSecondsToAvoidMistake() {
        return this.advanceSecondsToAvoidMistake;
    }

    public void setAdvanceSecondsToAvoidMistake(int i) {
        this.advanceSecondsToAvoidMistake = i;
    }
}
