package ru.vyukov.stomp;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.socket.messaging.WebSocketStompClient;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ru/vyukov/stomp/StompMessageChannel.class */
public class StompMessageChannel implements MessageChannel, SmartInitializingSingleton {
    private static final Logger log = LoggerFactory.getLogger(StompMessageChannel.class);
    private final WebSocketStompClient webSocketStompClient;
    private final TaskScheduler taskScheduler;
    private volatile ListenableFuture<StompSession> sessionFuture;
    private final ConnectConfig connectConfig;
    private volatile Future<?> reconnectTaskFuture;
    private volatile boolean connected;

    public StompMessageChannel(WebSocketStompClient webSocketStompClient, ConnectConfig connectConfig, TaskScheduler taskScheduler) {
        this.webSocketStompClient = webSocketStompClient;
        this.connectConfig = connectConfig;
        this.taskScheduler = taskScheduler;
    }

    public void start() {
        if (null != this.reconnectTaskFuture) {
            throw new IllegalStateException("Already started");
        }
        connect();
        this.reconnectTaskFuture = this.taskScheduler.scheduleWithFixedDelay(createReconnectTask(), this.connectConfig.getReconnectDelay());
    }

    public void stop() {
        if (null == this.connectConfig) {
            throw new IllegalStateException("Not started");
        }
        this.reconnectTaskFuture.cancel(true);
        this.webSocketStompClient.stop();
    }

    public boolean isConnected() {
        return this.connected;
    }

    protected void connect() {
        if (null != this.sessionFuture) {
            destroySessionFuture(this.sessionFuture);
        }
        ListenableFuture<StompSession> connect = this.webSocketStompClient.connect(this.connectConfig.getUrl(), this.connectConfig.getHandshakeHeaders(), this.connectConfig.getSessionHandler(), new Object[0]);
        connect.addCallback(createSetConnectedCallback(connect));
        this.sessionFuture = connect;
    }

    public boolean send(Message<?> message) {
        return send(message, -1L);
    }

    public boolean send(Message<?> message, long j) {
        try {
            getSessionWithTimeout(j).send((String) message.getHeaders().get("simpDestination"), message.getPayload());
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<StompSession> getSessionFuture() {
        if (null == this.sessionFuture) {
            throw new IllegalStateException("session not started");
        }
        return this.sessionFuture;
    }

    private StompSession getSessionWithDefaultTimeout() throws Exception {
        return getSessionWithTimeout(this.connectConfig.getReconnectDelay());
    }

    private synchronized StompSession getSessionWithTimeout(long j) throws Exception {
        try {
            return (StompSession) getSessionFuture().get(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            destroySessionFuture(getSessionFuture());
            throw e2;
        }
    }

    private void destroySessionFuture(ListenableFuture<StompSession> listenableFuture) {
        listenableFuture.addCallback(new DisconnectCallback());
        listenableFuture.cancel(true);
    }

    protected Runnable createReconnectTask() {
        return () -> {
            try {
                if (!getSessionWithDefaultTimeout().isConnected()) {
                    throw new Exception("Session not connected");
                }
                this.connected = true;
            } catch (InterruptedException e) {
                log.info("Interrupted", e);
            } catch (Exception e2) {
                log.info("Session problem", e2);
                log.info("Renew session");
                connect();
            }
        };
    }

    public void afterSingletonsInstantiated() {
        start();
    }

    private ListenableFutureCallback<? super StompSession> createSetConnectedCallback(final ListenableFuture<StompSession> listenableFuture) {
        return new ListenableFutureCallback<StompSession>() { // from class: ru.vyukov.stomp.StompMessageChannel.1
            public void onFailure(Throwable th) {
                if (!isActualSession()) {
                    StompMessageChannel.log.debug("Ignore disconnect from obsolete session" + listenableFuture);
                } else {
                    StompMessageChannel.this.connected = false;
                    StompMessageChannel.log.info("Connected from session " + listenableFuture);
                }
            }

            public void onSuccess(StompSession stompSession) {
                if (!isActualSession()) {
                    StompMessageChannel.log.debug("Ignore connected from obsolete session" + listenableFuture);
                } else {
                    StompMessageChannel.this.connected = true;
                    StompMessageChannel.log.info("Connected from session " + listenableFuture);
                }
            }

            private boolean isActualSession() {
                return listenableFuture == StompMessageChannel.this.getSessionFuture();
            }
        };
    }
}
