package org.apache.falcon.service;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FeedInstanceStatus;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-prism-0.8-classes.jar:org/apache/falcon/service/FeedSLAMonitoringService.class */
public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
    private static final int ONE_MS = 1;
    private int queueSize;
    private Set<String> monitoredFeeds;
    private Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances;
    private Date lastCheckedAt;
    private Date lastSerializedAt;
    private int statusCheckFrequencySeconds;
    private int lookAheadWindowMillis;
    private int serializationFrequencyMillis;
    private FileSystem fileSystem;
    private Path storePath;
    private Path filePath;
    private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
    private static final String ONE_HOUR = String.valueOf(DateUtils.MILLIS_IN_HOUR);
    private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService();
    private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);

    /* loaded from: input_file:WEB-INF/lib/falcon-prism-0.8-classes.jar:org/apache/falcon/service/FeedSLAMonitoringService$Monitor.class */
    private class Monitor implements Runnable {
        private Monitor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!FeedSLAMonitoringService.this.monitoredFeeds.isEmpty()) {
                    FeedSLAMonitoringService.this.checkPendingInstanceAvailability();
                    Date date = new Date();
                    Date date2 = new Date(date.getTime() + FeedSLAMonitoringService.this.lookAheadWindowMillis);
                    FeedSLAMonitoringService.this.addNewPendingFeedInstances(FeedSLAMonitoringService.this.lastCheckedAt, date2);
                    FeedSLAMonitoringService.this.lastCheckedAt = date2;
                    if (date.getTime() - FeedSLAMonitoringService.this.lastSerializedAt.getTime() > FeedSLAMonitoringService.this.serializationFrequencyMillis) {
                        FeedSLAMonitoringService.this.serializeState();
                        FeedSLAMonitoringService.this.lastSerializedAt = new Date();
                    }
                }
            } catch (Throwable th) {
                FeedSLAMonitoringService.LOG.error("Feed SLA monitoring failed: ", th);
            }
        }
    }

    private FeedSLAMonitoringService() {
    }

    public static FeedSLAMonitoringService get() {
        return SERVICE;
    }

    @Override // org.apache.falcon.service.ConfigurationChangeListener
    public void onAdd(Entity entity) throws FalconException {
        if (entity.getEntityType() == EntityType.FEED) {
            Feed feed = (Feed) entity;
            if (feed.getLocations() != null) {
                Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
                for (Cluster cluster : feed.getClusters().getClusters()) {
                    if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
                        LOG.debug("Adding feed:{} for monitoring", feed.getName());
                        this.monitoredFeeds.add(feed.getName());
                    }
                }
            }
        }
    }

    @Override // org.apache.falcon.service.ConfigurationChangeListener
    public void onRemove(Entity entity) throws FalconException {
        if (entity.getEntityType() == EntityType.FEED) {
            Feed feed = (Feed) entity;
            if (feed.getLocations() != null) {
                Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
                for (Cluster cluster : feed.getClusters().getClusters()) {
                    if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
                        this.monitoredFeeds.remove(feed.getName());
                        this.pendingInstances.remove(new Pair(feed.getName(), cluster.getName()));
                    }
                }
            }
        }
    }

    private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
        if (feed.getLocations() == null) {
            return false;
        }
        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
        for (Cluster cluster : feed.getClusters().getClusters()) {
            if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.falcon.service.ConfigurationChangeListener
    public void onChange(Entity entity, Entity entity2) throws FalconException {
        if (entity2.getEntityType() == EntityType.FEED) {
            Feed feed = (Feed) entity;
            Feed feed2 = (Feed) entity2;
            if (!isSLAMonitoringEnabledInCurrentColo(feed2)) {
                onRemove(feed);
                return;
            }
            if (!isSLAMonitoringEnabledInCurrentColo(feed)) {
                onAdd(feed2);
                return;
            }
            ArrayList arrayList = new ArrayList();
            for (String str : EntityUtil.getClustersDefinedInColos(feed)) {
                if (FeedHelper.getSLA(str, feed) != null && FeedHelper.getSLA(str, feed2) == null) {
                    arrayList.add(str);
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.pendingInstances.remove(new Pair(feed2.getName(), (String) it.next()));
            }
        }
    }

    @Override // org.apache.falcon.service.ConfigurationChangeListener
    public void onReload(Entity entity) throws FalconException {
        onAdd(entity);
    }

    @Override // org.apache.falcon.service.FalconService
    public String getName() {
        return FeedSLAMonitoringService.class.getSimpleName();
    }

    @Override // org.apache.falcon.service.FalconService
    public void init() throws FalconException {
        this.storePath = new Path(StartupProperties.get().getProperty("feed.sla.service.store.uri"));
        this.filePath = new Path(this.storePath, "feedSLAMonitoringService");
        this.fileSystem = initializeFileSystem();
        this.serializationFrequencyMillis = Integer.valueOf(StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis", ONE_HOUR)).intValue();
        this.statusCheckFrequencySeconds = Integer.valueOf(StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600")).intValue();
        this.lookAheadWindowMillis = Integer.valueOf(StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000")).intValue();
        this.queueSize = Integer.valueOf(StartupProperties.get().getProperty("feed.sla.queue.size", "288")).intValue();
        try {
            if (this.fileSystem.exists(this.filePath)) {
                deserialize(this.filePath);
            } else {
                LOG.debug("No old state exists at: {}, Initializing a clean state.", this.filePath.toString());
                initializeService();
            }
            new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(new Monitor(), 0L, this.statusCheckFrequencySeconds, TimeUnit.SECONDS);
        } catch (IOException e) {
            throw new FalconException("Couldn't check the existence of " + this.filePath, e);
        }
    }

    @Override // org.apache.falcon.service.FalconService
    public void destroy() throws FalconException {
        serializeState();
    }

    private FileSystem initializeFileSystem() {
        try {
            this.fileSystem = HadoopClientFactory.get().createFalconFileSystem(this.storePath.toUri());
            if (!this.fileSystem.exists(this.storePath)) {
                LOG.info("Creating directory for pending feed instances: {}", this.storePath);
                HadoopClientFactory.mkdirs(this.fileSystem, this.storePath, STORE_PERMISSION);
            }
            return this.fileSystem;
        } catch (Exception e) {
            throw new RuntimeException("Unable to bring up feed sla store for path: " + this.storePath, e);
        }
    }

    void addNewPendingFeedInstances(Date date, Date date2) throws FalconException {
        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
        Iterator<String> it = this.monitoredFeeds.iterator();
        while (it.hasNext()) {
            Feed feed = (Feed) EntityUtil.getEntity(EntityType.FEED, it.next());
            for (Cluster cluster : feed.getClusters().getClusters()) {
                if (currentClusters.contains(cluster.getName())) {
                    Pair<String, String> pair = new Pair<>(feed.getName(), cluster.getName());
                    BlockingQueue<Date> blockingQueue = this.pendingInstances.get(pair);
                    if (blockingQueue == null) {
                        blockingQueue = new LinkedBlockingQueue(this.queueSize);
                    }
                    HashSet hashSet = new HashSet(blockingQueue);
                    org.apache.falcon.entity.v0.cluster.Cluster cluster2 = (org.apache.falcon.entity.v0.cluster.Cluster) EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
                    Date nextStartTime = EntityUtil.getNextStartTime(feed, cluster2, date);
                    while (true) {
                        Date date3 = nextStartTime;
                        if (!date3.before(date2)) {
                            break;
                        }
                        if (blockingQueue.size() >= this.queueSize) {
                            LOG.debug("Removing instance={} for <feed,cluster>={}", blockingQueue.peek(), pair);
                            hashSet.remove(blockingQueue.peek());
                            blockingQueue.remove();
                        }
                        LOG.debug("Adding instance={} for <feed,cluster>={}", date3, pair);
                        if (hashSet.add(date3)) {
                            blockingQueue.add(date3);
                        }
                        nextStartTime = EntityUtil.getNextStartTime(feed, cluster2, new Date(date3.getTime() + 1));
                    }
                    this.pendingInstances.put(pair, blockingQueue);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkPendingInstanceAvailability() throws FalconException {
        for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : this.pendingInstances.entrySet()) {
            for (Date date : entry.getValue()) {
                if (checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date)) {
                    this.pendingInstances.get(entry.getKey()).remove(date);
                }
            }
        }
    }

    private boolean checkFeedInstanceAvailability(String str, String str2, Date date) throws FalconException {
        Feed feed = (Feed) EntityUtil.getEntity(EntityType.FEED, str);
        try {
            LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", feed.getName(), str2, date);
            FeedInstanceStatus.AvailabilityStatus feedInstanceStatus = FeedHelper.getFeedInstanceStatus(feed, str2, date);
            if (feedInstanceStatus.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE) || feedInstanceStatus.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
                LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", feed.getName(), str2, date);
                return true;
            }
        } catch (Throwable th) {
            LOG.error("Couldn't find status for feed:{}, cluster:{}", str, str2, th);
        }
        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", feed.getName(), str2, date);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void serializeState() throws FalconException {
        LOG.info("Saving context to: [{}]", this.storePath);
        Path path = new Path(this.storePath, "tmp");
        ObjectOutputStream objectOutputStream = null;
        try {
            try {
                objectOutputStream = new ObjectOutputStream(this.fileSystem.create(path));
                HashMap hashMap = new HashMap();
                hashMap.put("lastSerializedAt", Long.valueOf(this.lastSerializedAt.getTime()));
                hashMap.put("lastCheckedAt", Long.valueOf(this.lastCheckedAt.getTime()));
                hashMap.put("pendingInstances", this.pendingInstances);
                objectOutputStream.writeObject(hashMap);
                this.fileSystem.rename(path, this.filePath);
                IOUtils.closeQuietly((OutputStream) objectOutputStream);
            } catch (IOException e) {
                throw new FalconException("Error serializing context to : " + this.storePath.toUri(), e);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly((OutputStream) objectOutputStream);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void deserialize(Path path) throws FalconException {
        try {
            Map<String, Object> deserializeInternal = deserializeInternal(path);
            this.pendingInstances = new ConcurrentHashMap();
            for (Map.Entry entry : ((Map) deserializeInternal.get("pendingInstances")).entrySet()) {
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.queueSize);
                BlockingQueue blockingQueue = (BlockingQueue) entry.getValue();
                LOG.debug("Number of old instances:{}, new queue size:{}", Integer.valueOf(blockingQueue.size()), Integer.valueOf(this.queueSize));
                while (!blockingQueue.isEmpty()) {
                    Date date = (Date) blockingQueue.remove();
                    if (linkedBlockingQueue.size() == this.queueSize) {
                        LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", linkedBlockingQueue.peek(), entry.getKey());
                        linkedBlockingQueue.remove();
                    }
                    LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), date);
                    linkedBlockingQueue.add(date);
                }
                this.pendingInstances.put(entry.getKey(), linkedBlockingQueue);
            }
            this.lastCheckedAt = new Date(((Long) deserializeInternal.get("lastCheckedAt")).longValue());
            this.lastSerializedAt = new Date(((Long) deserializeInternal.get("lastSerializedAt")).longValue());
            this.monitoredFeeds = new ConcurrentHashSet();
            LOG.debug("Restored the service from old state.");
        } catch (IOException | ClassNotFoundException e) {
            throw new FalconException("Couldn't deserialize the old state", e);
        }
    }

    private void initializeService() {
        this.pendingInstances = new ConcurrentHashMap();
        this.lastCheckedAt = new Date();
        this.lastSerializedAt = new Date();
        this.monitoredFeeds = new ConcurrentHashSet();
    }

    private Map<String, Object> deserializeInternal(Path path) throws IOException, ClassNotFoundException {
        ObjectInputStream objectInputStream = new ObjectInputStream(this.fileSystem.open(path));
        try {
            Map<String, Object> map = (Map) objectInputStream.readObject();
            IOUtils.closeQuietly((InputStream) objectInputStream);
            return map;
        } catch (Throwable th) {
            IOUtils.closeQuietly((InputStream) objectInputStream);
            throw th;
        }
    }

    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date date, Date date2) throws FalconException {
        HashSet hashSet = new HashSet();
        for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : this.pendingInstances.entrySet()) {
            Pair<String, String> key = entry.getKey();
            Feed feed = (Feed) EntityUtil.getEntity(EntityType.FEED, key.first);
            Sla sla = FeedHelper.getSLA(FeedHelper.getCluster(feed, key.second), feed);
            if (sla != null) {
                for (Pair<Date, String> pair : getSLAStatus(sla, date, date2, entry.getValue())) {
                    SchedulableEntityInstance schedulableEntityInstance = new SchedulableEntityInstance(key.first, key.second, pair.first, EntityType.FEED);
                    schedulableEntityInstance.setTags(pair.second);
                    hashSet.add(schedulableEntityInstance);
                }
            }
        }
        return hashSet;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String str, String str2, Date date, Date date2) throws FalconException {
        HashSet hashSet = new HashSet();
        Pair pair = new Pair(str, str2);
        BlockingQueue<Date> blockingQueue = this.pendingInstances.get(pair);
        Feed feed = (Feed) EntityUtil.getEntity(EntityType.FEED, str);
        Sla sla = FeedHelper.getSLA(FeedHelper.getCluster(feed, (String) pair.second), feed);
        if (blockingQueue != null && sla != null) {
            for (Pair<Date, String> pair2 : getSLAStatus(sla, date, date2, blockingQueue)) {
                SchedulableEntityInstance schedulableEntityInstance = new SchedulableEntityInstance(str, str2, pair2.first, EntityType.FEED);
                schedulableEntityInstance.setTags(pair2.second);
                hashSet.add(schedulableEntityInstance);
            }
        }
        return hashSet;
    }

    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date date, Date date2, BlockingQueue<Date> blockingQueue) throws FalconException {
        Date date3 = new Date();
        Frequency slaLow = sla.getSlaLow();
        Frequency slaHigh = sla.getSlaHigh();
        HashSet hashSet = new HashSet();
        for (Date date4 : blockingQueue) {
            if (!date4.before(date) && !date4.after(date2)) {
                ExpressionHelper.setReferenceDate(date4);
                ExpressionHelper expressionHelper = ExpressionHelper.get();
                Long l = (Long) expressionHelper.evaluate(slaHigh.toString(), Long.class);
                Long l2 = (Long) expressionHelper.evaluate(slaLow.toString(), Long.class);
                Date date5 = new Date(date4.getTime() + l.longValue());
                Date date6 = new Date(date4.getTime() + l2.longValue());
                if (date5.before(date3)) {
                    hashSet.add(new Pair(date4, "Missed SLA High"));
                } else if (date6.before(date3)) {
                    hashSet.add(new Pair(date4, "Missed SLA Low"));
                }
            }
        }
        return hashSet;
    }
}
