package com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redisson.reactive;

import com.github.linyuzai.connection.loadbalance.core.concept.AliveForeverConnection;
import com.github.linyuzai.connection.loadbalance.core.message.MessageTransportException;
import com.github.linyuzai.connection.loadbalance.core.message.PingMessage;
import java.util.function.Consumer;
import org.redisson.RedissonReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;

/* loaded from: input_file:com/github/linyuzai/connection/loadbalance/autoconfigure/subscribe/redisson/reactive/ReactiveRedissonTopicObservableConnection.class */
public class ReactiveRedissonTopicObservableConnection extends AliveForeverConnection {
    private Object id;
    private RedissonReactiveClient client;
    private RTopicReactive topic;

    public ReactiveRedissonTopicObservableConnection() {
        setType("Connection@observable");
    }

    public void doSend(Object obj, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        this.topic.publish(obj).subscribe(l -> {
            runnable.run();
        }, th -> {
            if (th instanceof RedisException) {
                consumer.accept(new MessageTransportException(th));
            } else {
                consumer.accept(th);
            }
        }, runnable2);
    }

    public void doPing(PingMessage pingMessage, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        RedissonReactive redissonReactive = this.client;
        redissonReactive.getCommandExecutor().reactive(() -> {
            return redissonReactive.getCommandExecutor().readAsync((byte[]) null, StringCodec.INSTANCE, RedisCommands.PING, new Object[0]);
        }).subscribe(str -> {
            if ("PONG".equalsIgnoreCase(str)) {
                runnable.run();
            } else {
                consumer.accept(new IllegalStateException("Redis ping: " + str));
            }
        }, consumer, runnable2);
    }

    public void doClose(Object obj, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        runnable2.run();
    }

    public Object getId() {
        return this.id;
    }

    public RedissonReactiveClient getClient() {
        return this.client;
    }

    public RTopicReactive getTopic() {
        return this.topic;
    }

    public void setId(Object obj) {
        this.id = obj;
    }

    public void setClient(RedissonReactiveClient redissonReactiveClient) {
        this.client = redissonReactiveClient;
    }

    public void setTopic(RTopicReactive rTopicReactive) {
        this.topic = rTopicReactive;
    }
}
