package org.elasticsearch.plugin.river.redis;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/* loaded from: input_file:org/elasticsearch/plugin/river/redis/RedisDriver.class */
public class RedisDriver extends AbstractRiverComponent implements River {
    static final int DEFAULT_REDIS_PORT = 6379;
    static final String DEFAULT_REDIS_INDEX = "redis-index";
    static final String DEFAULT_REDIS_MESSAGE_FIELD = "message";
    static final String DEFAULT_REDIS_CHANNELS = "elasticsearch";
    static final String DEFAULT_REDIS_HOSTNAME = "localhost";
    private static Logger logger = LoggerFactory.getLogger(RedisSubscriber.class);
    private final String hostname;
    private final String password;
    private final String index;
    private final String[] channels;
    private final int port;
    private final int database;
    private final String messageField;
    private final boolean json;
    private final RedisIndexer indexer;
    final RiverSettings settings;
    final Client client;
    RedisSubscriber subscriber;
    Thread thread;

    @Inject
    public RedisDriver(RiverName riverName, RiverSettings riverSettings, @RiverIndexName String str, Client client) {
        super(riverName, riverSettings);
        this.settings = riverSettings;
        this.client = client;
        this.hostname = XContentMapValues.nodeStringValue(XContentMapValues.extractValue("redis.hostname", riverSettings.settings()), DEFAULT_REDIS_HOSTNAME);
        this.port = XContentMapValues.nodeIntegerValue(XContentMapValues.extractValue("redis.port", riverSettings.settings()), DEFAULT_REDIS_PORT);
        this.channels = XContentMapValues.nodeStringValue(XContentMapValues.extractValue("redis.channels", riverSettings.settings()), DEFAULT_REDIS_CHANNELS).split(",");
        this.database = XContentMapValues.nodeIntegerValue(XContentMapValues.extractValue("redis.database", riverSettings.settings()), 0);
        this.password = XContentMapValues.nodeStringValue(XContentMapValues.extractValue("redis.password", riverSettings.settings()), (String) null);
        this.messageField = XContentMapValues.nodeStringValue(XContentMapValues.extractValue("redis.messageField", riverSettings.settings()), DEFAULT_REDIS_MESSAGE_FIELD);
        this.json = XContentMapValues.nodeBooleanValue(XContentMapValues.extractValue("redis.json", riverSettings.settings()), false);
        this.index = XContentMapValues.nodeStringValue(XContentMapValues.extractValue("index.name", riverSettings.settings()), DEFAULT_REDIS_INDEX);
        logger.debug("Redis settings [hostname={}, port={}, database={}]", new Object[]{this.hostname, Integer.valueOf(this.port), Integer.valueOf(this.database)});
        logger.debug("River settings [indexName={}, channels={}, messageField={}, json={}]", new Object[]{this.index, this.channels, this.messageField, Boolean.valueOf(this.json)});
        this.indexer = new RedisIndexer(client, this.index, this.json, this.messageField);
    }

    public void start() {
        logger.info("Starting redis subscriber");
        try {
            ensureIndexCreated();
            try {
                this.subscriber = new RedisSubscriber(this.settings, this.indexer);
                startSubscriberThread(this.subscriber);
            } catch (Exception e) {
                logger.debug("Could not create redis pool. Disabling river");
            }
        } catch (Exception e2) {
            logger.debug("Could not create index. Disabling river");
        }
    }

    void startSubscriberThread(RedisSubscriber redisSubscriber) {
        this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "redis_subscription").newThread(new RedisSubscriptionTask(getJedisPool(), redisSubscriber, this.channels));
        this.thread.start();
    }

    JedisPool getJedisPool() {
        return new JedisPool(new JedisPoolConfig(), this.hostname, this.port, 0, this.password, this.database);
    }

    void ensureIndexCreated() {
        try {
            logger.debug("Creating index [{}]...", this.index);
            this.client.admin().indices().prepareCreate(this.index).execute().actionGet();
            logger.error("... created");
        } catch (Exception e) {
            if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                logger.debug("... index already exists");
            } else {
                logger.error("... error {}", e);
                throw e;
            }
        }
    }

    public void close() {
        if (this.subscriber == null || !this.subscriber.isSubscribed()) {
            return;
        }
        this.subscriber.unsubscribe();
    }

    public String getPassword() {
        return this.password;
    }

    public String getHostname() {
        return this.hostname;
    }

    public int getPort() {
        return this.port;
    }

    public int getDatabase() {
        return this.database;
    }

    public String getMessageField() {
        return this.messageField;
    }

    public String[] getChannels() {
        return this.channels;
    }

    public String getIndex() {
        return this.index;
    }

    public boolean isJson() {
        return this.json;
    }
}
