package com.github.microwww.redis.protocal.operation;

import com.github.microwww.redis.ChannelContext;
import com.github.microwww.redis.ExpectRedisRequest;
import com.github.microwww.redis.database.Bytes;
import com.github.microwww.redis.database.PubSub;
import com.github.microwww.redis.logger.LogFactory;
import com.github.microwww.redis.logger.Logger;
import com.github.microwww.redis.protocal.AbstractOperation;
import com.github.microwww.redis.protocal.RedisOutputProtocol;
import com.github.microwww.redis.protocal.RedisRequest;
import com.github.microwww.redis.protocal.jedis.Protocol;
import com.github.microwww.redis.util.Assert;
import com.github.microwww.redis.util.SafeEncoder;
import com.github.microwww.redis.util.StringUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;

/* loaded from: input_file:com/github/microwww/redis/protocal/operation/PubSubOperation.class */
public class PubSubOperation extends AbstractOperation {
    private static final Logger log = LogFactory.getLogger(PubSubOperation.class);

    /* loaded from: input_file:com/github/microwww/redis/protocal/operation/PubSubOperation$ChannelMessageListener.class */
    public static class ChannelMessageListener implements Observer {
        private final ChannelContext context;
        private Bytes patten;
        private final Bytes channel;
        private final PubSub pubSub;
        private final ChannelContext.CloseListener channelClose;

        public ChannelMessageListener(ChannelContext channelContext, Bytes bytes, PubSub pubSub) {
            this.context = channelContext;
            this.channel = bytes;
            this.pubSub = pubSub;
            this.channelClose = channelContext.addCloseListener(this::close);
        }

        private void close(ChannelContext channelContext) {
            try {
                unsubscribe();
            } catch (Exception e) {
            }
        }

        @Override // java.util.Observer
        public void update(Observable observable, Object obj) {
            try {
                ArrayList arrayList = new ArrayList(4);
                if (getPatten().isPresent()) {
                    arrayList.add(SafeEncoder.encode("pmessage"));
                    arrayList.add(getPatten().get().getBytes());
                } else {
                    arrayList.add(SafeEncoder.encode("message"));
                }
                arrayList.add(this.channel);
                Assert.isTrue(obj instanceof Bytes, "Observable publish must be `Bytes`");
                arrayList.add(obj);
                RedisOutputProtocol.writerComplex(this.context.getOutputStream(), arrayList.toArray());
                this.context.getOutputStream().flush();
            } catch (Exception e) {
                PubSubOperation.log.warn("Notify subscriber error, ignore, {}", e);
            }
        }

        public void unsubscribe() {
            this.pubSub.unsubscribe(this.channel, this);
            this.context.getSubscribe().removeSubscribe(this.channel);
            this.context.removeCloseListener(this.channelClose);
        }

        public void subscribe() {
            find(this.context, this.channel).ifPresent((v0) -> {
                v0.unsubscribe();
            });
            this.context.getSubscribe().addSubscribe(this.channel, this);
            this.pubSub.subscribe(this.channel, this);
        }

        public static Optional<ChannelMessageListener> find(ChannelContext channelContext, Bytes bytes) {
            return channelContext.getSubscribe().getSubscribe(bytes);
        }

        public ChannelMessageListener setPatten(Bytes bytes) {
            this.patten = bytes;
            return this;
        }

        public Optional<Bytes> getPatten() {
            return Optional.ofNullable(this.patten);
        }
    }

    /* loaded from: input_file:com/github/microwww/redis/protocal/operation/PubSubOperation$NewChannelListener.class */
    public static class NewChannelListener implements Observer {
        private final ChannelContext context;
        private final Bytes bytes;
        private final Pattern patten;
        private final PubSub pubSub;
        private final ChannelContext.CloseListener channelClose;
        private final Map<Bytes, ChannelMessageListener> notifies = new HashMap();

        public NewChannelListener(ChannelContext channelContext, Bytes bytes, PubSub pubSub) {
            this.context = channelContext;
            this.bytes = bytes;
            this.patten = StringUtil.antPattern(SafeEncoder.encode(bytes.getBytes()));
            this.pubSub = pubSub;
            this.channelClose = channelContext.addCloseListener(this::close);
        }

        private void close(ChannelContext channelContext) {
            try {
                unsubscribe();
            } catch (Exception e) {
            }
        }

        @Override // java.util.Observer
        public void update(Observable observable, Object obj) {
            Bytes bytes = (Bytes) obj;
            if (this.patten.matcher(SafeEncoder.encode(bytes.getBytes())).matches()) {
                this.notifies.computeIfAbsent(bytes, bytes2 -> {
                    ChannelMessageListener channelMessageListener = new ChannelMessageListener(this.context, (Bytes) obj, this.pubSub);
                    channelMessageListener.setPatten(this.bytes);
                    this.pubSub.subscribe(bytes, channelMessageListener);
                    return channelMessageListener;
                });
            }
        }

        public void unsubscribe() {
            this.pubSub.newChannelNotify.unsubscribe(this);
            this.notifies.values().forEach(channelMessageListener -> {
                this.pubSub.unsubscribe(channelMessageListener.channel, channelMessageListener);
            });
            this.context.getPattenSubscribe().removeSubscribe(this.bytes);
            this.context.removeCloseListener(this.channelClose);
        }

        public void subscribe() {
            find(this.context, this.bytes).ifPresent((v0) -> {
                v0.unsubscribe();
            });
            this.pubSub.newChannelNotify.subscribe(this);
            this.context.getPattenSubscribe().addSubscribe(this.bytes, this);
        }

        public static Optional<NewChannelListener> find(ChannelContext channelContext, Bytes bytes) {
            return channelContext.getPattenSubscribe().getSubscribe(bytes);
        }

        public Bytes getPatten() {
            return this.bytes;
        }
    }

    public void psubscribe(RedisRequest redisRequest) throws IOException {
        redisRequest.expectArgumentsCountGE(1);
        ExpectRedisRequest[] args = redisRequest.getArgs();
        Object[] objArr = new Object[3];
        objArr[0] = "psubscribe".getBytes(StandardCharsets.UTF_8);
        PubSub pubSub = redisRequest.getPubSub();
        for (ExpectRedisRequest expectRedisRequest : args) {
            Bytes bytes = expectRedisRequest.toBytes();
            NewChannelListener newChannelListener = new NewChannelListener(redisRequest.getContext(), bytes, pubSub);
            try {
                newChannelListener.subscribe();
            } catch (Exception e) {
                log.warn("subscribe [{}] error", newChannelListener.patten, e);
                newChannelListener.unsubscribe();
            }
            objArr[1] = bytes.getBytes();
            objArr[2] = Integer.valueOf(redisRequest.getContext().getPattenSubscribe().subscribeChannels().size());
            RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), objArr);
        }
    }

    public void publish(RedisRequest redisRequest) throws IOException {
        PubSub pubSub = redisRequest.getPubSub();
        redisRequest.expectArgumentsCount(2);
        ExpectRedisRequest[] args = redisRequest.getArgs();
        RedisOutputProtocol.writer(redisRequest.getOutputStream(), pubSub.publish(args[0].toBytes(), args[1].toBytes()));
    }

    public void pubsub(RedisRequest redisRequest) throws IOException {
        redisRequest.expectArgumentsCountGE(1);
        String lowerCase = redisRequest.getArgs()[0].getByteArray2string().toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -1034350755:
                if (lowerCase.equals(Protocol.PUBSUB_NUM_PAT)) {
                    z = 2;
                    break;
                }
                break;
            case -1034347270:
                if (lowerCase.equals(Protocol.PUBSUB_NUMSUB)) {
                    z = true;
                    break;
                }
                break;
            case 1432626128:
                if (lowerCase.equals(Protocol.PUBSUB_CHANNELS)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case Protocol.DEFAULT_DATABASE /* 0 */:
                subChannels(redisRequest);
                return;
            case true:
                subNumSub(redisRequest);
                return;
            case true:
                subNumPat(redisRequest);
                return;
            default:
                throw new UnsupportedOperationException(lowerCase);
        }
    }

    private void subChannels(RedisRequest redisRequest) throws IOException {
        Stream<PubSub.MessageChannel> filter = redisRequest.getPubSub().getChannels().values().stream().filter((v0) -> {
            return v0.isActive();
        });
        if (redisRequest.getArgs().length > 1) {
            String byteArray2string = redisRequest.getArgs()[1].getByteArray2string();
            filter = filter.filter(messageChannel -> {
                return StringUtil.antPatternMatches(byteArray2string, SafeEncoder.encode(messageChannel.getChannel().getBytes()));
            });
        }
        RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), filter.map((v0) -> {
            return v0.getChannel();
        }).toArray(i -> {
            return new Object[i];
        }));
    }

    private void subNumSub(RedisRequest redisRequest) throws IOException {
        PubSub pubSub = redisRequest.getPubSub();
        int length = redisRequest.getArgs().length;
        ArrayList arrayList = new ArrayList();
        if (length > 1) {
            for (int i = 1; i < length; i++) {
                Bytes bytes = redisRequest.getArgs()[i].toBytes();
                arrayList.add(bytes);
                PubSub.MessageChannel messageChannel = pubSub.getChannels().get(bytes);
                if (messageChannel == null) {
                    arrayList.add(0);
                } else {
                    arrayList.add(Integer.valueOf(messageChannel.getNumsub()));
                }
            }
        }
        RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), arrayList.toArray());
    }

    private void subNumPat(RedisRequest redisRequest) throws IOException {
        RedisOutputProtocol.writer(redisRequest.getOutputStream(), new HashSet(redisRequest.getPubSub().newChannelNotify.getPattens()).size());
    }

    public void punsubscribe(RedisRequest redisRequest) throws IOException {
        ExpectRedisRequest[] args = redisRequest.getArgs();
        Object[] objArr = new Object[3];
        objArr[0] = SafeEncoder.encode("punsubscribe");
        if (args.length == 0) {
            Iterator it = new HashSet(redisRequest.getContext().getPattenSubscribe().subscribeChannels().keySet()).iterator();
            while (it.hasNext()) {
                Bytes bytes = (Bytes) it.next();
                objArr[1] = bytes;
                NewChannelListener.find(redisRequest.getContext(), bytes).ifPresent((v0) -> {
                    v0.unsubscribe();
                });
                objArr[2] = Integer.valueOf(redisRequest.getContext().getPattenSubscribe().subscribeChannels().size());
                RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), objArr);
            }
        } else {
            for (ExpectRedisRequest expectRedisRequest : args) {
                Bytes bytes2 = expectRedisRequest.toBytes();
                objArr[1] = bytes2;
                ChannelMessageListener.find(redisRequest.getContext(), bytes2).ifPresent((v0) -> {
                    v0.unsubscribe();
                });
                objArr[2] = Integer.valueOf(redisRequest.getContext().getSubscribe().subscribeChannels().size());
                RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), objArr);
            }
        }
        redisRequest.getOutputStream().flush();
    }

    public void subscribe(RedisRequest redisRequest) throws IOException {
        PubSub pubSub = redisRequest.getPubSub();
        redisRequest.expectArgumentsCountGE(1);
        ExpectRedisRequest[] args = redisRequest.getArgs();
        Object[] objArr = new Object[3];
        objArr[0] = "subscribe".getBytes(StandardCharsets.UTF_8);
        for (ExpectRedisRequest expectRedisRequest : args) {
            Bytes bytes = expectRedisRequest.toBytes();
            ChannelMessageListener channelMessageListener = new ChannelMessageListener(redisRequest.getContext(), bytes, pubSub);
            try {
                channelMessageListener.subscribe();
            } catch (Exception e) {
                log.warn("subscribe [{}] error", channelMessageListener.channel, e);
                channelMessageListener.unsubscribe();
            }
            objArr[1] = bytes.getBytes();
            objArr[2] = Integer.valueOf(redisRequest.getContext().getSubscribe().subscribeChannels().size());
            RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), objArr);
        }
    }

    public void unsubscribe(RedisRequest redisRequest) throws IOException {
        ExpectRedisRequest[] args = redisRequest.getArgs();
        Object[] objArr = new Object[3];
        objArr[0] = "unsubscribe".getBytes(StandardCharsets.UTF_8);
        if (args.length == 0) {
            Iterator it = new HashSet(redisRequest.getContext().getSubscribe().subscribeChannels().keySet()).iterator();
            while (it.hasNext()) {
                Bytes bytes = (Bytes) it.next();
                objArr[1] = bytes;
                ChannelMessageListener.find(redisRequest.getContext(), bytes).ifPresent((v0) -> {
                    v0.unsubscribe();
                });
                objArr[2] = Integer.valueOf(redisRequest.getContext().getSubscribe().subscribeChannels().size());
                RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), objArr);
            }
        } else {
            for (ExpectRedisRequest expectRedisRequest : args) {
                Bytes bytes2 = expectRedisRequest.toBytes();
                objArr[1] = bytes2;
                ChannelMessageListener.find(redisRequest.getContext(), bytes2).ifPresent((v0) -> {
                    v0.unsubscribe();
                });
                objArr[2] = Integer.valueOf(redisRequest.getContext().getSubscribe().subscribeChannels().size());
                RedisOutputProtocol.writerComplex(redisRequest.getOutputStream(), objArr);
            }
        }
        redisRequest.getOutputStream().flush();
    }
}
