/*
 * Decompiled with CFR 0.152.
 */
package net.lightoze.jooq.postgresql.notify;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NotificationListener
extends AbstractExecutionThreadService {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private String[] channels = new String[0];
    private long retryDelayMillis = 5000L;
    private long fetchIntervalMillis = 500L;

    public String[] getChannels() {
        return this.channels;
    }

    public void setChannels(String[] channels) {
        this.channels = channels;
    }

    public long getRetryDelayMillis() {
        return this.retryDelayMillis;
    }

    public void setRetryDelayMillis(long retryDelayMillis) {
        this.retryDelayMillis = retryDelayMillis;
    }

    public long getFetchIntervalMillis() {
        return this.fetchIntervalMillis;
    }

    public void setFetchIntervalMillis(long fetchIntervalMillis) {
        this.fetchIntervalMillis = fetchIntervalMillis;
    }

    protected abstract Connection getConnection() throws SQLException;

    protected abstract void closeConnection(Connection var1) throws SQLException;

    protected abstract void receiveNotification(PGNotification var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void run() throws Exception {
        while (this.isRunning()) {
            Connection conn;
            try {
                conn = this.getConnection();
                conn.setAutoCommit(true);
            }
            catch (Throwable e) {
                this.log.error("Could not get a connection", e);
                Thread.sleep(this.retryDelayMillis);
                continue;
            }
            try {
                for (String channel : this.channels) {
                    Statement stmt = conn.createStatement();
                    stmt.execute("LISTEN " + channel);
                    stmt.close();
                }
                while (this.isRunning()) {
                    Statement stmt = conn.createStatement();
                    stmt.execute("SELECT 1");
                    stmt.close();
                    PGNotification[] notifications = conn.unwrap(PGConnection.class).getNotifications();
                    if (notifications == null || notifications.length == 0) {
                        Thread.sleep(this.fetchIntervalMillis);
                        continue;
                    }
                    for (PGNotification notification : notifications) {
                        try {
                            this.receiveNotification(notification);
                        }
                        catch (Throwable e) {
                            this.log.error("Error processing notification {} ({}) from {}", new Object[]{notification.getName(), notification.getParameter(), notification.getPID(), e});
                        }
                    }
                }
            }
            catch (Throwable e) {
                this.log.error("Error while listening for notifications", e);
                Thread.sleep(this.retryDelayMillis);
            }
            finally {
                try {
                    this.closeConnection(conn);
                }
                catch (Throwable e) {
                    this.log.warn("Error closing connection", e);
                }
            }
        }
    }
}

