package org.apache.inlong.agent.plugin.sources.reader;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.moilioncircle.redis.replicator.RedisReplicator;
import com.moilioncircle.redis.replicator.Replicator;
import com.moilioncircle.redis.replicator.cmd.CommandName;
import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.event.EventListener;
import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/RedisReader.class */
public class RedisReader extends org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader {
    public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric";
    public static final String JOB_REDIS_PORT = "job.redisJob.port";
    public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname";
    public static final String JOB_REDIS_SSL = "job.redisJob.ssl";
    public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser";
    public static final String JOB_REDIS_AUTHPASSWORD = "job.redisJob.authPassword";
    public static final String JOB_REDIS_READTIMEOUT = "job.redisJob.readTimeout";
    public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize";
    public static final String JOB_REDIS_REPLID = "job.redisJob.replId";
    public static final String JOB_REDIS_OFFSET = "job.redisJob.offset";
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisReader.class);
    private String port;
    private String hostName;
    private boolean ssl;
    private String authUser;
    private String authPassword;
    private String readTimeout;
    private String instanceId;
    private String replId;
    private String snapShot;
    private boolean destroyed;
    private Replicator redisReplicator;
    private LinkedBlockingQueue<String> redisMessageQueue;
    private boolean finished = false;
    private ExecutorService executor;
    private Gson gson;

    @Override // org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader
    public void init(InstanceProfile instanceProfile) {
        LOGGER.info("Init redis reader with jobConf {}", instanceProfile.toJsonStr());
        this.port = instanceProfile.get(JOB_REDIS_PORT);
        this.hostName = instanceProfile.get(JOB_REDIS_HOSTNAME);
        this.ssl = instanceProfile.getBoolean(JOB_REDIS_SSL, false);
        this.authUser = instanceProfile.get(JOB_REDIS_AUTHUSER, "");
        this.authPassword = instanceProfile.get(JOB_REDIS_AUTHPASSWORD, "");
        this.readTimeout = instanceProfile.get(JOB_REDIS_READTIMEOUT, "");
        this.replId = instanceProfile.get(JOB_REDIS_REPLID, "");
        this.snapShot = instanceProfile.get(JOB_REDIS_OFFSET, "-1");
        this.instanceId = instanceProfile.getInstanceId();
        this.finished = false;
        this.redisMessageQueue = new LinkedBlockingQueue<>(instanceProfile.getInt(JOB_REDIS_QUEUE_SIZE, LogFileCollectTask.CORE_THREAD_PRINT_TIME));
        initGson();
        try {
            this.redisReplicator = new RedisReplicator(getRedisUri());
            initReplicator();
            this.redisReplicator.addEventListener(new EventListener() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.1
                public void onEvent(Replicator replicator, Event event) {
                    try {
                        if ((event instanceof DefaultCommand) || (event instanceof KeyValuePair)) {
                            RedisReader.this.redisMessageQueue.put(RedisReader.this.gson.toJson(event));
                            AuditUtils.add(3, RedisReader.this.inlongGroupId, RedisReader.this.inlongStreamId, System.currentTimeMillis(), 1, r0.length());
                            RedisReader.this.readerMetric.pluginReadCount.incrementAndGet();
                        }
                        if (event instanceof PostRdbSyncEvent) {
                            RedisReader.this.snapShot = String.valueOf(replicator.getConfiguration().getReplOffset());
                            RedisReader.LOGGER.info("after rdb snapShot is: {}", RedisReader.this.snapShot);
                        }
                    } catch (InterruptedException e) {
                        RedisReader.this.readerMetric.pluginReadFailCount.incrementAndGet();
                        RedisReader.LOGGER.error("Read redis data error", e);
                    }
                }
            });
            this.executor = Executors.newSingleThreadExecutor();
            this.executor.execute(new Thread(() -> {
                try {
                    this.redisReplicator.open();
                } catch (IOException e) {
                    LOGGER.error("Redis source error", e);
                }
            }));
        } catch (IOException | URISyntaxException e) {
            this.readerMetric.pluginReadFailCount.addAndGet(1L);
            LOGGER.error("Connect to redis {}:{} failed.", this.hostName, this.port);
        }
    }

    private String getRedisUri() {
        StringBuffer stringBuffer = new StringBuffer("redis://");
        stringBuffer.append(this.hostName).append(":").append(this.port);
        stringBuffer.append("?");
        if (!StringUtils.isEmpty(this.authPassword)) {
            stringBuffer.append("authPassword=").append(this.authPassword).append("&");
        }
        if (!StringUtils.isEmpty(this.authUser)) {
            stringBuffer.append("authUser=").append(this.authUser).append("&");
        }
        if (!StringUtils.isEmpty(this.readTimeout)) {
            stringBuffer.append("readTimeout=").append(this.readTimeout).append("&");
        }
        if (this.ssl) {
            stringBuffer.append("ssl=").append("yes").append("&");
        }
        if (!StringUtils.isEmpty(this.snapShot)) {
            stringBuffer.append("replOffset=").append(this.snapShot).append("&");
        }
        if (!StringUtils.isEmpty(this.replId)) {
            stringBuffer.append("replId=").append(this.replId).append("&");
        }
        if (stringBuffer.charAt(stringBuffer.length() - 1) == '?' || stringBuffer.charAt(stringBuffer.length() - 1) == '&') {
            stringBuffer.deleteCharAt(stringBuffer.length() - 1);
        }
        return stringBuffer.toString();
    }

    public void destroy() {
        synchronized (this) {
            if (!this.destroyed) {
                try {
                    this.executor.shutdown();
                    this.redisReplicator.close();
                } catch (IOException e) {
                    LOGGER.error("Redis reader close failed.");
                }
                this.destroyed = true;
            }
        }
    }

    public Message read() {
        if (this.redisMessageQueue.isEmpty()) {
            return null;
        }
        this.readerMetric.pluginReadCount.incrementAndGet();
        return new DefaultMessage(this.redisMessageQueue.poll().getBytes());
    }

    public boolean isFinished() {
        return this.finished;
    }

    public String getReadSource() {
        return this.instanceId;
    }

    public void setReadSource(String str) {
        this.instanceId = str;
    }

    public void setReadTimeout(long j) {
    }

    public void setWaitMillisecond(long j) {
    }

    public String getSnapshot() {
        return this.snapShot;
    }

    public void finishRead() {
        this.finished = true;
    }

    public boolean isSourceExist() {
        return true;
    }

    private void initGson() {
        this.gson = new GsonBuilder().registerTypeAdapter(KeyStringValueHash.class, new TypeAdapter<KeyStringValueHash>() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.7
            public void write(JsonWriter jsonWriter, KeyStringValueHash keyStringValueHash) throws IOException {
                jsonWriter.beginObject();
                jsonWriter.name("DB").beginObject();
                jsonWriter.name("dbNumber").value(keyStringValueHash.getDb().getDbNumber());
                jsonWriter.name("dbSize").value(keyStringValueHash.getDb().getDbsize());
                jsonWriter.name("expires").value(keyStringValueHash.getDb().getExpires());
                jsonWriter.endObject();
                jsonWriter.name("valueRdbType").value(keyStringValueHash.getValueRdbType());
                jsonWriter.name("key").value(new String((byte[]) keyStringValueHash.getKey()));
                jsonWriter.name("value").beginObject();
                for (byte[] bArr : ((Map) keyStringValueHash.getValue()).keySet()) {
                    jsonWriter.name(new String(bArr)).value(new String((byte[]) ((Map) keyStringValueHash.getValue()).get(bArr)));
                }
                jsonWriter.endObject();
                jsonWriter.endObject();
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public KeyStringValueHash m38read(JsonReader jsonReader) throws IOException {
                return null;
            }
        }).registerTypeAdapter(DefaultCommand.class, new TypeAdapter<DefaultCommand>() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.6
            public void write(JsonWriter jsonWriter, DefaultCommand defaultCommand) throws IOException {
                jsonWriter.beginObject();
                jsonWriter.name("key").value(new String(defaultCommand.getCommand()));
                jsonWriter.name("value").beginArray();
                for (byte[] bArr : defaultCommand.getArgs()) {
                    jsonWriter.value(new String(bArr));
                }
                jsonWriter.endArray();
                jsonWriter.endObject();
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public DefaultCommand m37read(JsonReader jsonReader) throws IOException {
                return null;
            }
        }).registerTypeAdapter(KeyStringValueList.class, new TypeAdapter<KeyStringValueList>() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.5
            public void write(JsonWriter jsonWriter, KeyStringValueList keyStringValueList) throws IOException {
                jsonWriter.beginObject();
                jsonWriter.name("key").value(new String((byte[]) keyStringValueList.getKey()));
                jsonWriter.name("value").beginArray();
                Iterator it = ((List) keyStringValueList.getValue()).iterator();
                while (it.hasNext()) {
                    jsonWriter.value(new String((byte[]) it.next()));
                }
                jsonWriter.endArray();
                jsonWriter.endObject();
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public KeyStringValueList m36read(JsonReader jsonReader) throws IOException {
                return null;
            }
        }).registerTypeAdapter(KeyStringValueSet.class, new TypeAdapter<KeyStringValueSet>() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.4
            public void write(JsonWriter jsonWriter, KeyStringValueSet keyStringValueSet) throws IOException {
                jsonWriter.beginObject();
                jsonWriter.name("key").value(new String((byte[]) keyStringValueSet.getKey()));
                jsonWriter.name("value").beginArray();
                Iterator it = ((Set) keyStringValueSet.getValue()).iterator();
                while (it.hasNext()) {
                    jsonWriter.value(new String((byte[]) it.next()));
                }
                jsonWriter.endArray();
                jsonWriter.endObject();
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public KeyStringValueSet m35read(JsonReader jsonReader) throws IOException {
                return null;
            }
        }).registerTypeAdapter(KeyStringValueString.class, new TypeAdapter<KeyStringValueString>() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.3
            public void write(JsonWriter jsonWriter, KeyStringValueString keyStringValueString) throws IOException {
                jsonWriter.beginObject();
                jsonWriter.name("key").value(new String((byte[]) keyStringValueString.getKey()));
                jsonWriter.name("value").value(new String((byte[]) keyStringValueString.getValue()));
                jsonWriter.endObject();
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public KeyStringValueString m34read(JsonReader jsonReader) throws IOException {
                return null;
            }
        }).registerTypeAdapter(KeyStringValueZSet.class, new TypeAdapter<KeyStringValueZSet>() { // from class: org.apache.inlong.agent.plugin.sources.reader.RedisReader.2
            public void write(JsonWriter jsonWriter, KeyStringValueZSet keyStringValueZSet) throws IOException {
                jsonWriter.beginObject();
                jsonWriter.name("key").value(new String((byte[]) keyStringValueZSet.getKey()));
                jsonWriter.name("value").beginArray();
                for (ZSetEntry zSetEntry : (Set) keyStringValueZSet.getValue()) {
                    jsonWriter.beginObject();
                    jsonWriter.name("element").value(new String(zSetEntry.getElement()));
                    jsonWriter.name("score").value(zSetEntry.getScore());
                    jsonWriter.endObject();
                }
                jsonWriter.endArray();
                jsonWriter.endObject();
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public KeyStringValueZSet m33read(JsonReader jsonReader) throws IOException {
                return null;
            }
        }).create();
    }

    private void initReplicator() {
        DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
        this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SET"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SETEX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("MSET"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("DEL"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SADD"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("HMSET"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("HSET"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LSET"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("EXPIRE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("GETSET"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("HSETNX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("MSETNX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PSETEX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SETNX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SETRANGE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("HDEL"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LPOP"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LPUSH"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LPUSHX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LRem"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RPOP"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RPUSH"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RPUSHX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZREM"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RENAME"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("INCR"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("DECR"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("INCRBY"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("DECRBY"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PERSIST"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SELECT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("HINCRBY"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("MOVE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SMOVE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PFADD"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PFMERGE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZADD"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LINSERT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RENAMENX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RESTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("GEOADD"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("EVAL"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("EVALSHA"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SCRIPT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("PUBLISH"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("BITOP"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("BITFIELD"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SETBIT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SREM"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("UNLINK"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SWAPDB"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("MULTI"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("EXEC"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LTRIM"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SORT"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("REPLCONF"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XACK"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XADD"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XCLAIM"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XDEL"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XGROUP"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XTRIM"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("XSETID"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("COPY"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("LMOVE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("BLMOVE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), defaultCommandParser);
        this.redisReplicator.addCommandParser(CommandName.name("FUNCTION"), defaultCommandParser);
    }
}
