package com.github.pandaxz.events.replication;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.postgresql.PGConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/pandaxz/events/replication/ReplicationStream.class */
public class ReplicationStream implements Closeable {
    public static final String OUTPUT_PLUGIN = "wal2json";
    private static final Logger logger = LoggerFactory.getLogger(ReplicationStream.class);
    private String slotName;
    private ReplicationConnectionSource replicationConnectionSource;
    private DataSource connectionSource;
    private ReplicationStreamSource streamHolder;

    /* loaded from: input_file:com/github/pandaxz/events/replication/ReplicationStream$ReplicationStreamSource.class */
    private class ReplicationStreamSource {
        private PGReplicationStream stream;
        private volatile boolean reconnectRequired = true;
        private List<String> tables;
        private ReplicationConnectionSource replicationConnectionSource;
        private String slotName;
        private LogSequenceNumber nextLsn;

        public ReplicationStreamSource(String str, ReplicationConnectionSource replicationConnectionSource, List<String> list) {
            this.tables = list;
            this.replicationConnectionSource = replicationConnectionSource;
            this.replicationConnectionSource.registerSubscriber(() -> {
                this.reconnectRequired = true;
            });
            this.slotName = str;
        }

        public PGReplicationStream getStream() throws SQLException {
            PGConnection connection = this.replicationConnectionSource.getConnection();
            if (this.stream == null || this.stream.isClosed() || this.reconnectRequired) {
                this.stream = createReplicationStream(connection);
                this.reconnectRequired = false;
            }
            return this.stream;
        }

        private PGReplicationStream createReplicationStream(PGConnection pGConnection) throws SQLException {
            ChainedLogicalStreamBuilder withSlotName = pGConnection.getReplicationAPI().replicationStream().logical().withSlotName(this.slotName);
            if (this.nextLsn != null && !this.nextLsn.equals(LogSequenceNumber.INVALID_LSN)) {
                withSlotName.withStartPosition(this.nextLsn);
            }
            return withSlotName.withSlotOption("include-xids", true).withSlotOption("pretty-print", true).withSlotOption("include-timestamp", true).withSlotOption("include-types", false).withSlotOption("include-lsn", true).withSlotOption("add-tables", String.join(", ", this.tables)).withStatusInterval(15, TimeUnit.SECONDS).start();
        }

        public void closeStream() throws SQLException {
            if (this.stream == null || this.stream.isClosed()) {
                return;
            }
            this.stream.close();
            this.reconnectRequired = true;
        }

        public void commitNextLsn(LogSequenceNumber logSequenceNumber) {
            this.nextLsn = logSequenceNumber;
        }
    }

    public ReplicationStream(String str, ReplicationConnectionSource replicationConnectionSource, DataSource dataSource, List<String> list) {
        this.slotName = str;
        this.replicationConnectionSource = replicationConnectionSource;
        this.connectionSource = dataSource;
        this.streamHolder = new ReplicationStreamSource(str, replicationConnectionSource, list);
    }

    public ReplicationEvent receive() {
        logger.trace("Receive message");
        try {
            PGReplicationStream stream = this.streamHolder.getStream();
            ByteBuffer readPending = stream.readPending();
            Instant now = Instant.now(Clock.systemUTC());
            LogSequenceNumber lastReceiveLSN = stream.getLastReceiveLSN();
            if (readPending == null) {
                return null;
            }
            int arrayOffset = readPending.arrayOffset();
            byte[] array = readPending.array();
            return new ReplicationEvent(new String(array, arrayOffset, array.length - arrayOffset), now, lastReceiveLSN);
        } catch (Exception e) {
            logger.info("Error when receiving: ", e);
            this.replicationConnectionSource.invalidateConnection();
            return null;
        }
    }

    public boolean commit(LogSequenceNumber logSequenceNumber, LogSequenceNumber logSequenceNumber2) {
        if (logSequenceNumber2 != null) {
            this.streamHolder.commitNextLsn(logSequenceNumber2);
        }
        try {
            PGReplicationStream stream = this.streamHolder.getStream();
            stream.setAppliedLSN(logSequenceNumber);
            stream.setFlushedLSN(logSequenceNumber);
            return silentlyUpdateStatus(stream);
        } catch (SQLException e) {
            return false;
        }
    }

    public void resetUncommitted() {
        this.replicationConnectionSource.invalidateConnection();
    }

    public boolean dropSlot() throws SQLException {
        if (!slotExists()) {
            return true;
        }
        try {
            this.replicationConnectionSource.getConnection().getReplicationAPI().dropReplicationSlot(this.slotName);
            logger.debug(this.slotName + " - slot is dropped");
            return true;
        } catch (SQLException e) {
            this.replicationConnectionSource.invalidateConnection();
            throw e;
        }
    }

    public boolean createSlot() throws SQLException {
        if (slotExists()) {
            return true;
        }
        try {
            this.replicationConnectionSource.getConnection().getReplicationAPI().createReplicationSlot().logical().withSlotName(this.slotName).withOutputPlugin(OUTPUT_PLUGIN).make();
            logger.debug(this.slotName + " - slot is created");
            return true;
        } catch (SQLException e) {
            this.replicationConnectionSource.invalidateConnection();
            throw e;
        }
    }

    public boolean slotExists() throws SQLException {
        Connection connection = this.connectionSource.getConnection();
        try {
            Statement createStatement = connection.createStatement();
            try {
                ResultSet executeQuery = createStatement.executeQuery("SELECT count(*)>0 FROM pg_replication_slots where slot_name = '" + this.slotName + "'");
                executeQuery.next();
                boolean z = executeQuery.getBoolean(1);
                if (createStatement != null) {
                    createStatement.close();
                }
                if (connection != null) {
                    connection.close();
                }
                return z;
            } finally {
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean silentlyUpdateStatus(PGReplicationStream pGReplicationStream) {
        if (pGReplicationStream == null) {
            return false;
        }
        try {
            pGReplicationStream.forceUpdateStatus();
            return true;
        } catch (Exception e) {
            this.replicationConnectionSource.invalidateConnection();
            logger.error("error to update status", e);
            return false;
        }
    }

    public void setSlotName(String str) {
        this.slotName = str;
    }

    public void setReplicationConnectionSource(ReplicationConnectionSource replicationConnectionSource) {
        this.replicationConnectionSource = replicationConnectionSource;
    }

    public void setConnectionSource(DataSource dataSource) {
        this.connectionSource = dataSource;
    }

    public String getSlotName() {
        return this.slotName;
    }

    public ReplicationConnectionSource getReplicationConnectionSource() {
        return this.replicationConnectionSource;
    }

    public DataSource getConnectionSource() {
        return this.connectionSource;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.streamHolder.closeStream();
        } catch (SQLException e) {
        }
    }
}
