package co.cask.cdap.internal.app.runtime.schedule;

import co.cask.cdap.api.metrics.MetricType;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.common.stream.notification.StreamSizeNotification;
import co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase;
import co.cask.cdap.notifications.service.NotificationService;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.id.NotificationFeedId;
import co.cask.cdap.test.XSlowTests;
import com.google.common.collect.ImmutableMap;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/StreamSizeSchedulerTest.class */
public class StreamSizeSchedulerTest extends SchedulerTestBase {
    private static NotificationService notificationService;

    @BeforeClass
    public static void setup() throws Exception {
        notificationService = (NotificationService) injector.getInstance(NotificationService.class);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase
    protected SchedulerTestBase.StreamMetricsPublisher createMetricsPublisher(final Id.Stream stream) {
        final NotificationFeedId notificationFeedId = new NotificationFeedId(stream.getNamespaceId(), "stream", stream.getId() + "Size");
        return new SchedulerTestBase.StreamMetricsPublisher() { // from class: co.cask.cdap.internal.app.runtime.schedule.StreamSizeSchedulerTest.1
            long totalSize;

            @Override // co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.StreamMetricsPublisher
            public void increment(long j) throws Exception {
                SchedulerTestBase.metricStore.add(new MetricValues(ImmutableMap.of("ns", stream.getNamespaceId(), "str", stream.getId()), "collect.bytes", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), j, MetricType.COUNTER));
                this.totalSize += j;
                StreamSizeSchedulerTest.notificationService.publish(notificationFeedId, new StreamSizeNotification(System.currentTimeMillis(), this.totalSize));
            }
        };
    }
}
