package org.apache.streams.messaging.aggregation;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.messaging.service.impl.CassandraActivityService;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.springframework.scheduling.annotation.Scheduled;

/* loaded from: input_file:org/apache/streams/messaging/aggregation/ActivityAggregator.class */
public class ActivityAggregator {
    private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
    private CassandraActivityService activityService;
    private static final transient Log LOG = LogFactory.getLog(ActivityAggregator.class);

    public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
        this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
    }

    public void setActivityService(CassandraActivityService cassandraActivityService) {
        this.activityService = cassandraActivityService;
    }

    @Scheduled(fixedRate = 30000)
    public void distributeToSubscribers() {
        Iterator it = this.activityStreamsSubscriberWarehouse.getAllSubscribers().iterator();
        while (it.hasNext()) {
            updateSubscriber((ActivityStreamsSubscriber) it.next());
        }
    }

    public void updateSubscriber(ActivityStreamsSubscriber activityStreamsSubscriber) {
        TreeSet treeSet = new TreeSet();
        treeSet.addAll(this.activityService.getActivitiesForFilters(activityStreamsSubscriber.getActivityStreamsSubscriberConfiguration().getFilters(), activityStreamsSubscriber.getLastUpdated()));
        activityStreamsSubscriber.setLastUpdated(new Date());
        activityStreamsSubscriber.receive(new ArrayList(treeSet));
    }
}
