package org.elasticsearch.river.rss;

import com.rometools.rome.feed.rss.Channel;
import com.rometools.rome.feed.synd.SyndEntry;
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.FeedException;
import com.rometools.rome.io.SyndFeedInput;
import com.rometools.rome.io.XmlReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.rss.RssToJson;

/* loaded from: input_file:org/elasticsearch/river/rss/RssRiver.class */
public class RssRiver extends AbstractRiverComponent implements River {
    private final Client client;
    private final String indexName;
    private final String typeName;
    private final Boolean raw;
    private final int bulkSize;
    private final int maxConcurrentBulk;
    private final TimeValue bulkFlushInterval;
    private volatile BulkProcessor bulkProcessor;
    private volatile ArrayList<Thread> threads;
    private volatile boolean closed;
    private final ArrayList<RssRiverFeedDefinition> feedsDefinition;

    /* loaded from: input_file:org/elasticsearch/river/rss/RssRiver$RSSParser.class */
    private class RSSParser implements Runnable {
        private String url;
        private TimeValue updateRate;
        private String feedname;
        private boolean ignoreTtl;

        public RSSParser(String str, String str2, TimeValue timeValue, boolean z) {
            this.feedname = str;
            this.url = str2;
            this.updateRate = timeValue;
            this.ignoreTtl = z;
            if (RssRiver.this.logger.isInfoEnabled()) {
                RssRiver.this.logger.info("creating rss stream river [{}] for [{}] every [{}] ms", new Object[]{str, str2, timeValue});
            }
        }

        public RSSParser(RssRiver rssRiver, RssRiverFeedDefinition rssRiverFeedDefinition) {
            this(rssRiverFeedDefinition.getFeedname(), rssRiverFeedDefinition.getUrl(), rssRiverFeedDefinition.getUpdateRate(), rssRiverFeedDefinition.isIgnoreTtl());
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!RssRiver.this.closed) {
                SyndFeed feed = RssRiver.this.getFeed(this.url);
                if (feed != null) {
                    if (RssRiver.this.logger.isDebugEnabled()) {
                        RssRiver.this.logger.debug("Reading feed from {}", new Object[]{this.url});
                    }
                    Date publishedDate = feed.getPublishedDate();
                    if (RssRiver.this.logger.isDebugEnabled()) {
                        RssRiver.this.logger.debug("Feed publish date is {}", new Object[]{publishedDate});
                    }
                    String str = "_lastupdated_" + UUID.nameUUIDFromBytes(this.url.getBytes()).toString();
                    Date lastDateFromRiver = getLastDateFromRiver(str);
                    if (lastDateFromRiver == null || (publishedDate != null && publishedDate.after(lastDateFromRiver))) {
                        if (RssRiver.this.logger.isTraceEnabled()) {
                            RssRiver.this.logger.trace("Feed is updated : {}", new Object[]{feed});
                        }
                        try {
                            Date date = null;
                            for (SyndEntry syndEntry : feed.getEntries()) {
                                if (publishedDate == null) {
                                    if (syndEntry.getUpdatedDate() != null && ((lastDateFromRiver == null || syndEntry.getUpdatedDate().after(lastDateFromRiver)) && (date == null || syndEntry.getUpdatedDate().after(date)))) {
                                        date = syndEntry.getUpdatedDate();
                                        if (RssRiver.this.logger.isTraceEnabled()) {
                                            RssRiver.this.logger.trace("No feed date. Using item updated date : {}", new Object[]{publishedDate});
                                        }
                                    }
                                    if (syndEntry.getPublishedDate() != null && ((lastDateFromRiver == null || syndEntry.getPublishedDate().after(lastDateFromRiver)) && (date == null || syndEntry.getPublishedDate().after(date)))) {
                                        date = syndEntry.getPublishedDate();
                                        if (RssRiver.this.logger.isTraceEnabled()) {
                                            RssRiver.this.logger.trace("No feed date. Using item published date : {}", new Object[]{publishedDate});
                                        }
                                    }
                                }
                                String uuid = UUID.nameUUIDFromBytes((syndEntry.getDescription() != null ? syndEntry.getDescription().getValue() : "").getBytes()).toString();
                                if (!((GetResponse) RssRiver.this.client.prepareGet(RssRiver.this.indexName, RssRiver.this.typeName, uuid).execute().actionGet()).isExists()) {
                                    RssRiver.this.bulkProcessor.add(Requests.indexRequest(RssRiver.this.indexName).type(RssRiver.this.typeName).id(uuid).source(RssToJson.toJson(syndEntry, RssRiver.this.riverName.getName(), this.feedname, RssRiver.this.raw.booleanValue())));
                                    if (RssRiver.this.logger.isDebugEnabled()) {
                                        ESLogger eSLogger = RssRiver.this.logger;
                                        Object[] objArr = new Object[1];
                                        objArr[0] = this.feedname != null ? this.feedname : "undefined";
                                        eSLogger.debug("FeedMessage update detected for source [{}]", objArr);
                                    }
                                    if (RssRiver.this.logger.isTraceEnabled()) {
                                        RssRiver.this.logger.trace("FeedMessage is : {}", new Object[]{syndEntry});
                                    }
                                } else if (RssRiver.this.logger.isTraceEnabled()) {
                                    RssRiver.this.logger.trace("FeedMessage {} already exist. Ignoring", new Object[]{uuid});
                                }
                            }
                            if (publishedDate == null) {
                                publishedDate = date;
                            }
                            if (RssRiver.this.logger.isTraceEnabled()) {
                                RssRiver.this.logger.trace("processing [_seq  ]: [{}]/[{}]/[{}], last_seq [{}]", new Object[]{RssRiver.this.indexName, RssRiver.this.riverName.name(), str, publishedDate});
                            }
                            RssRiver.this.bulkProcessor.add(Requests.indexRequest("_river").type(RssRiver.this.riverName.name()).id(str).source(XContentFactory.jsonBuilder().startObject().startObject("rss").field(str, publishedDate).endObject().endObject()));
                        } catch (IOException e) {
                            RssRiver.this.logger.warn("failed to add feed message entry to bulk indexing", new Object[0]);
                        }
                    } else if (RssRiver.this.logger.isDebugEnabled()) {
                        RssRiver.this.logger.debug("Nothing new in the feed... Relaxing...", new Object[0]);
                    }
                    if (!this.ignoreTtl && feed.originalWireFeed() != null && (feed.originalWireFeed() instanceof Channel)) {
                        Channel originalWireFeed = feed.originalWireFeed();
                        if (originalWireFeed.getTtl() > 0) {
                            int ttl = originalWireFeed.getTtl();
                            if (ttl != this.updateRate.minutes()) {
                                this.updateRate = TimeValue.timeValueMinutes(ttl);
                                if (RssRiver.this.logger.isInfoEnabled()) {
                                    RssRiver.this.logger.info("Auto adjusting update rate with provided ttl: {}", new Object[]{this.updateRate});
                                }
                            }
                        }
                    }
                }
                try {
                    if (RssRiver.this.logger.isDebugEnabled()) {
                        RssRiver.this.logger.debug("Rss river is going to sleep for {}", new Object[]{this.updateRate});
                    }
                    Thread.sleep(this.updateRate.millis());
                } catch (InterruptedException e2) {
                }
            }
        }

        private Date getLastDateFromRiver(String str) {
            Object obj;
            Date date = null;
            try {
                RssRiver.this.client.admin().indices().prepareRefresh(new String[]{"_river"}).execute().actionGet();
                GetResponse getResponse = (GetResponse) RssRiver.this.client.prepareGet("_river", RssRiver.this.riverName().name(), str).execute().actionGet();
                if (getResponse.isExists()) {
                    Map map = (Map) getResponse.getSourceAsMap().get("rss");
                    if (map != null && (obj = map.get(str)) != null) {
                        date = ISODateTimeFormat.dateOptionalTimeParser().parseDateTime(obj.toString()).toDate();
                    }
                } else if (RssRiver.this.logger.isDebugEnabled()) {
                    RssRiver.this.logger.debug("{} doesn't exist", new Object[]{str});
                }
            } catch (Exception e) {
                RssRiver.this.logger.warn("failed to get _lastupdate, throttling....", e, new Object[0]);
            }
            return date;
        }
    }

    @Inject
    public RssRiver(RiverName riverName, RiverSettings riverSettings, Client client) throws MalformedURLException {
        super(riverName, riverSettings);
        this.closed = false;
        this.client = client;
        if (riverSettings.settings().containsKey("rss")) {
            Map map = (Map) riverSettings.settings().get("rss");
            if (XContentMapValues.isArray(map.get("feeds"))) {
                ArrayList arrayList = (ArrayList) map.get("feeds");
                this.feedsDefinition = new ArrayList<>(arrayList.size());
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Map map2 = (Map) it.next();
                    this.feedsDefinition.add(new RssRiverFeedDefinition(XContentMapValues.nodeStringValue(map2.get("name"), (String) null), XContentMapValues.nodeStringValue(map2.get(RssToJson.Rss.Enclosures.URL), (String) null), TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(map2.get("update_rate"), (String) null), TimeValue.timeValueMinutes(15L)), XContentMapValues.nodeBooleanValue(map2.get("ignore_ttl"), false)));
                }
            } else {
                this.logger.warn("rss.url and rss.update_rate have been deprecated. Use rss.feeds[].url and rss.feeds[].update_rate instead.", new Object[0]);
                this.logger.warn("See https://github.com/dadoonet/rssriver/issues/6 for more details...", new Object[0]);
                String nodeStringValue = XContentMapValues.nodeStringValue(map.get(RssToJson.Rss.Enclosures.URL), (String) null);
                TimeValue parseTimeValue = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(map.get("update_rate"), (String) null), TimeValue.timeValueMinutes(15L));
                boolean nodeBooleanValue = XContentMapValues.nodeBooleanValue("ignore_ttl", false);
                this.feedsDefinition = new ArrayList<>(1);
                this.feedsDefinition.add(new RssRiverFeedDefinition(null, nodeStringValue, parseTimeValue, nodeBooleanValue));
            }
            this.raw = Boolean.valueOf(XContentMapValues.nodeBooleanValue(map.get(RssToJson.Rss.RAW), true));
        } else {
            this.logger.warn("You didn't define the rss url. Switching to defaults : [{}]", new Object[]{"http://www.lemonde.fr/rss/une.xml"});
            this.feedsDefinition = new ArrayList<>(1);
            this.feedsDefinition.add(new RssRiverFeedDefinition("lemonde", "http://www.lemonde.fr/rss/une.xml", TimeValue.timeValueMinutes(15L), false));
            this.raw = true;
        }
        if (!riverSettings.settings().containsKey("index")) {
            this.indexName = riverName.name();
            this.typeName = "page";
            this.bulkSize = 100;
            this.maxConcurrentBulk = 1;
            this.bulkFlushInterval = TimeValue.timeValueSeconds(5L);
            return;
        }
        Map map3 = (Map) riverSettings.settings().get("index");
        this.indexName = XContentMapValues.nodeStringValue(map3.get("index"), riverName.name());
        this.typeName = XContentMapValues.nodeStringValue(map3.get("type"), "page");
        this.bulkSize = XContentMapValues.nodeIntegerValue(map3.get("bulk_size"), 25);
        this.bulkFlushInterval = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(map3.get("flush_interval"), (String) null), TimeValue.timeValueSeconds(5L));
        this.maxConcurrentBulk = XContentMapValues.nodeIntegerValue(map3.get("max_concurrent_bulk"), 1);
    }

    public void start() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Starting rss stream", new Object[0]);
        }
        try {
            this.client.admin().indices().prepareCreate(this.indexName).execute().actionGet();
        } catch (Exception e) {
            if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) && !(ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException)) {
                this.logger.warn("failed to create index [{}], disabling river...", e, new Object[]{this.indexName});
                return;
            }
        }
        try {
            pushMapping(this.indexName, this.typeName, RssToJson.buildRssMapping(this.typeName, this.raw.booleanValue()));
            this.bulkProcessor = BulkProcessor.builder(this.client, new BulkProcessor.Listener() { // from class: org.elasticsearch.river.rss.RssRiver.1
                public void beforeBulk(long j, BulkRequest bulkRequest) {
                    RssRiver.this.logger.debug("Going to execute new bulk composed of {} actions", new Object[]{Integer.valueOf(bulkRequest.numberOfActions())});
                }

                public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                    RssRiver.this.logger.debug("Executed bulk composed of {} actions", new Object[]{Integer.valueOf(bulkRequest.numberOfActions())});
                    if (bulkResponse.hasFailures()) {
                        RssRiver.this.logger.warn("There was failures while executing bulk", new Object[]{bulkResponse.buildFailureMessage()});
                        if (RssRiver.this.logger.isDebugEnabled()) {
                            for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                                if (bulkItemResponse.isFailed()) {
                                    RssRiver.this.logger.debug("Error for {}/{}/{} for {} operation: {}", new Object[]{bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId(), bulkItemResponse.getOpType(), bulkItemResponse.getFailureMessage()});
                                }
                            }
                        }
                    }
                }

                public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                    RssRiver.this.logger.warn("Error executing bulk", th, new Object[0]);
                }
            }).setBulkActions(this.bulkSize).setConcurrentRequests(this.maxConcurrentBulk).setFlushInterval(this.bulkFlushInterval).build();
            this.threads = new ArrayList<>(this.feedsDefinition.size());
            int i = 0;
            Iterator<RssRiverFeedDefinition> it = this.feedsDefinition.iterator();
            while (it.hasNext()) {
                Thread newThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "rss_slurper_" + i).newThread(new RSSParser(this, it.next()));
                newThread.start();
                this.threads.add(newThread);
                i++;
            }
        } catch (Exception e2) {
            this.logger.warn("failed to create mapping for [{}/{}], disabling river...", e2, new Object[]{this.indexName, this.typeName});
        }
    }

    public void close() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Closing rss river", new Object[0]);
        }
        this.closed = true;
        this.bulkProcessor.close();
        if (this.threads != null) {
            Iterator<Thread> it = this.threads.iterator();
            while (it.hasNext()) {
                Thread next = it.next();
                if (next != null) {
                    next.interrupt();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SyndFeed getFeed(String str) {
        try {
            URLConnection openConnection = new URL(str).openConnection();
            openConnection.addRequestProperty("User-Agent", "RSS River for Elasticsearch (https://github.com/dadoonet/rssriver)");
            SyndFeedInput syndFeedInput = new SyndFeedInput();
            syndFeedInput.setPreserveWireFeed(true);
            return syndFeedInput.build(new XmlReader(openConnection));
        } catch (IllegalArgumentException e) {
            this.logger.error("Feed from [{}] is incorrect.", new Object[]{str});
            return null;
        } catch (MalformedURLException e2) {
            this.logger.error("RSS Url is incorrect : [{}].", new Object[]{str});
            return null;
        } catch (FeedException e3) {
            this.logger.error("Can not parse feed from [{}].", new Object[]{str});
            return null;
        } catch (IOException e4) {
            this.logger.error("Can not read feed from [{}].", new Object[]{str});
            return null;
        }
    }

    private boolean isMappingExist(String str, String str2) {
        IndexMetaData index = ((ClusterStateResponse) this.client.admin().cluster().prepareState().setIndices(new String[]{str}).execute().actionGet()).getState().getMetaData().index(str);
        return (index == null || index.mapping(str2) == null) ? false : true;
    }

    private void pushMapping(String str, String str2, XContentBuilder xContentBuilder) throws Exception {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("pushMapping(" + str + "," + str2 + ")", new Object[0]);
        }
        if (!isMappingExist(str, str2)) {
            this.logger.debug("Mapping [" + str + "]/[" + str2 + "] doesn't exist. Creating it.", new Object[0]);
            if (xContentBuilder != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Mapping for [" + str + "]/[" + str2 + "]=" + xContentBuilder.string(), new Object[0]);
                }
                if (!((PutMappingResponse) this.client.admin().indices().preparePutMapping(new String[]{str}).setType(str2).setSource(xContentBuilder).execute().actionGet()).isAcknowledged()) {
                    throw new Exception("Could not define mapping for type [" + str + "]/[" + str2 + "].");
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Mapping definition for [" + str + "]/[" + str2 + "] succesfully created.", new Object[0]);
                }
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("No mapping definition for [" + str + "]/[" + str2 + "]. Ignoring.", new Object[0]);
            }
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("Mapping [" + str + "]/[" + str2 + "] already exists.", new Object[0]);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("/pushMapping(" + str + "," + str2 + ")", new Object[0]);
        }
    }
}
