package org.apache.camel.component.pg.replication.slot;

import java.net.SocketException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledPollConsumer;
import org.postgresql.PGConnection;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;

/* loaded from: input_file:org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.class */
public class PgReplicationSlotConsumer extends ScheduledPollConsumer {
    private final PgReplicationSlotEndpoint endpoint;
    private Connection connection;
    private PGConnection pgConnection;
    private PGReplicationStream replicationStream;
    private ScheduledExecutorService scheduledExecutor;
    private byte[] payload;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PgReplicationSlotConsumer(PgReplicationSlotEndpoint pgReplicationSlotEndpoint, Processor processor) {
        super(pgReplicationSlotEndpoint, processor);
        this.endpoint = pgReplicationSlotEndpoint;
    }

    protected void doStart() throws Exception {
        super.doStart();
        connect();
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "PgReplicationStatusUpdateSender");
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        if (this.scheduledExecutor != null) {
            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.scheduledExecutor);
            this.scheduledExecutor = null;
        }
    }

    protected int poll() throws Exception {
        PGReplicationStream stream = getStream();
        if (stream == null) {
            return 0;
        }
        try {
            if (this.payload == null) {
                ByteBuffer readPending = stream.readPending();
                if (readPending == null) {
                    return 0;
                }
                int arrayOffset = readPending.arrayOffset();
                byte[] array = readPending.array();
                int length = array.length - arrayOffset;
                this.payload = new byte[length];
                System.arraycopy(array, arrayOffset, this.payload, 0, length);
            }
            Exchange createExchange = this.endpoint.createExchange();
            createExchange.setExchangeId(stream.getLastReceiveLSN().asString());
            createExchange.getIn().setBody(this.payload);
            long intValue = this.endpoint.getStatusInterval().intValue();
            final ScheduledFuture<?> scheduleAtFixedRate = this.scheduledExecutor.scheduleAtFixedRate(() -> {
                try {
                    this.log.debug("Processing took too long. Sending status update to avoid disconnect.");
                    stream.forceUpdateStatus();
                } catch (SQLException e) {
                    this.log.error(e.getMessage(), e);
                }
            }, intValue, intValue, TimeUnit.SECONDS);
            createExchange.addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.pg.replication.slot.PgReplicationSlotConsumer.1
                public void onComplete(Exchange exchange) {
                    PgReplicationSlotConsumer.this.processCommit(exchange);
                    scheduleAtFixedRate.cancel(true);
                }

                public void onFailure(Exchange exchange) {
                    PgReplicationSlotConsumer.this.processRollback(exchange);
                    scheduleAtFixedRate.cancel(true);
                }
            });
            getProcessor().process(createExchange);
            return 1;
        } catch (SQLException e) {
            if (e.getCause() instanceof SocketException) {
                this.log.info("Connection to PosgreSQL server has been lost, trying to reconnect.");
                connect();
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processCommit(Exchange exchange) {
        try {
            this.payload = null;
            PGReplicationStream stream = getStream();
            if (stream == null) {
                return;
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
            stream.forceUpdateStatus();
        } catch (SQLException e) {
            getExceptionHandler().handleException("Exception while sending feedback to PostgreSQL.", exchange, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
        }
    }

    private void createSlot() throws SQLException {
        this.pgConnection.getReplicationAPI().createReplicationSlot().logical().withSlotName(this.endpoint.getSlot()).withOutputPlugin(this.endpoint.getOutputPlugin()).make();
    }

    private boolean isSlotCreated() throws SQLException {
        String format = String.format("SELECT count(*) FROM pg_replication_slots WHERE slot_name = '%s';", this.endpoint.getSlot());
        Statement createStatement = this.connection.createStatement();
        Throwable th = null;
        try {
            ResultSet executeQuery = createStatement.executeQuery(format);
            Throwable th2 = null;
            try {
                try {
                    executeQuery.next();
                    boolean z = executeQuery.getInt(1) > 0;
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    return z;
                } finally {
                }
            } catch (Throwable th4) {
                if (executeQuery != null) {
                    if (th2 != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createStatement.close();
                }
            }
        }
    }

    private PGReplicationStream getStream() throws SQLException {
        if (this.replicationStream != null && !this.replicationStream.isClosed()) {
            return this.replicationStream;
        }
        if (isSlotActive()) {
            this.log.debug(String.format("Slot: %s is active. Waiting for it to be available.", this.endpoint.getSlot()));
            return null;
        }
        ChainedLogicalStreamBuilder withStatusInterval = this.pgConnection.getReplicationAPI().replicationStream().logical().withSlotName(this.endpoint.getSlot()).withStatusInterval(this.endpoint.getStatusInterval().intValue(), TimeUnit.SECONDS);
        Properties properties = new Properties();
        properties.putAll(this.endpoint.getSlotOptions());
        withStatusInterval.withSlotOptions(properties);
        this.replicationStream = withStatusInterval.start();
        return this.replicationStream;
    }

    private boolean isSlotActive() throws SQLException {
        String format = String.format("SELECT count(*) FROM pg_replication_slots where slot_name = '%s' AND active = true;", this.endpoint.getSlot());
        Statement createStatement = this.connection.createStatement();
        Throwable th = null;
        try {
            ResultSet executeQuery = createStatement.executeQuery(format);
            Throwable th2 = null;
            try {
                try {
                    executeQuery.next();
                    boolean z = executeQuery.getInt(1) > 0;
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    return z;
                } finally {
                }
            } catch (Throwable th4) {
                if (executeQuery != null) {
                    if (th2 != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createStatement.close();
                }
            }
        }
    }

    private void connect() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
        this.connection = this.endpoint.newDbConnection();
        this.pgConnection = (PGConnection) this.connection.unwrap(PGConnection.class);
        this.replicationStream = null;
        if (!this.endpoint.getAutoCreateSlot().booleanValue() || isSlotCreated()) {
            return;
        }
        createSlot();
    }
}
