/*
 * Decompiled with CFR 0.152.
 */
package io.leopard.boot.data.queue;

import io.leopard.boot.data.queue.IConsumer;
import io.leopard.boot.data.queue.Queue;
import io.leopard.redis.RedisImpl;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class QueueRedisImpl
implements Queue {
    private Log logger = LogFactory.getLog(this.getClass());
    private RedisImpl redis;
    protected String server;
    protected String password;
    protected int maxActive = 16;
    protected int timeout = 10000;

    public String getServer() {
        return this.server;
    }

    public void setServer(String server) {
        this.server = server;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getMaxActive() {
        return this.maxActive;
    }

    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void init() {
        this.redis = new RedisImpl(this.server, this.maxActive, this.timeout);
        this.redis.init();
    }

    public void destroy() {
        this.redis.destroy();
    }

    @Override
    public void publish(String routingKey, String message) {
        this.redis.rpush(routingKey, new String[]{message});
    }

    @Override
    public void subscribe(final String queue, final IConsumer callback) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask(){

            @Override
            public void run() {
                try {
                    QueueRedisImpl.this.subscribe2(queue, callback);
                }
                catch (Exception e) {
                    QueueRedisImpl.this.logger.error((Object)e.getMessage(), (Throwable)e);
                }
            }
        }, 0L, 1000L);
    }

    private void subscribe2(String queue, IConsumer callback) {
        String message;
        while ((message = this.redis.lpop(queue)) != null) {
            try {
                callback.consume(message);
            }
            catch (Exception e) {
                this.logger.error((Object)("message:" + message));
                this.logger.error((Object)e.getMessage(), (Throwable)e);
            }
        }
    }
}

