package co.cask.cdap.data.stream.service;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.stream.StreamFileJanitor;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/data/stream/service/LocalStreamFileJanitorService.class */
public final class LocalStreamFileJanitorService extends AbstractService implements StreamFileJanitorService {
    private static final Logger LOG = LoggerFactory.getLogger(LocalStreamFileJanitorService.class);
    private final StreamFileJanitor janitor;
    private final long cleanupPeriod;
    private ScheduledExecutorService executor;

    @Inject
    public LocalStreamFileJanitorService(StreamFileJanitor streamFileJanitor, CConfiguration cConfiguration) {
        this.janitor = streamFileJanitor;
        this.cleanupPeriod = cConfiguration.getLong("stream.file.cleanup.period");
    }

    protected void doStart() {
        this.executor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("stream-cleanup"));
        this.executor.submit(new Runnable() { // from class: co.cask.cdap.data.stream.service.LocalStreamFileJanitorService.1
            @Override // java.lang.Runnable
            public void run() {
                if (LocalStreamFileJanitorService.this.state() != Service.State.RUNNING) {
                    LocalStreamFileJanitorService.LOG.info("Janitor already stopped");
                    return;
                }
                LocalStreamFileJanitorService.LOG.debug("Execute stream file cleanup.");
                try {
                    try {
                        LocalStreamFileJanitorService.this.janitor.cleanAll();
                        LocalStreamFileJanitorService.LOG.debug("Completed stream file cleanup.");
                        long currentTimeMillis = System.currentTimeMillis();
                        long j = (((currentTimeMillis / LocalStreamFileJanitorService.this.cleanupPeriod) + 1) * LocalStreamFileJanitorService.this.cleanupPeriod) - currentTimeMillis;
                        if (j <= 0) {
                            LocalStreamFileJanitorService.this.executor.submit(this);
                        } else {
                            LocalStreamFileJanitorService.LOG.debug("Schedule stream file cleanup in {} ms", Long.valueOf(j));
                            LocalStreamFileJanitorService.this.executor.schedule(this, j, TimeUnit.MILLISECONDS);
                        }
                    } catch (Throwable th) {
                        LocalStreamFileJanitorService.LOG.warn("Failed to cleanup stream file: {}", th.getMessage());
                        LocalStreamFileJanitorService.LOG.debug("Failed to cleanup stream file.", th);
                        long currentTimeMillis2 = System.currentTimeMillis();
                        long j2 = (((currentTimeMillis2 / LocalStreamFileJanitorService.this.cleanupPeriod) + 1) * LocalStreamFileJanitorService.this.cleanupPeriod) - currentTimeMillis2;
                        if (j2 <= 0) {
                            LocalStreamFileJanitorService.this.executor.submit(this);
                        } else {
                            LocalStreamFileJanitorService.LOG.debug("Schedule stream file cleanup in {} ms", Long.valueOf(j2));
                            LocalStreamFileJanitorService.this.executor.schedule(this, j2, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (Throwable th2) {
                    long currentTimeMillis3 = System.currentTimeMillis();
                    long j3 = (((currentTimeMillis3 / LocalStreamFileJanitorService.this.cleanupPeriod) + 1) * LocalStreamFileJanitorService.this.cleanupPeriod) - currentTimeMillis3;
                    if (j3 <= 0) {
                        LocalStreamFileJanitorService.this.executor.submit(this);
                    } else {
                        LocalStreamFileJanitorService.LOG.debug("Schedule stream file cleanup in {} ms", Long.valueOf(j3));
                        LocalStreamFileJanitorService.this.executor.schedule(this, j3, TimeUnit.MILLISECONDS);
                    }
                    throw th2;
                }
            }
        });
        notifyStarted();
    }

    protected void doStop() {
        this.executor.submit(new Runnable() { // from class: co.cask.cdap.data.stream.service.LocalStreamFileJanitorService.2
            @Override // java.lang.Runnable
            public void run() {
                LocalStreamFileJanitorService.LOG.debug("Stream file janitor stopped");
                LocalStreamFileJanitorService.this.notifyStopped();
            }
        });
        this.executor.shutdown();
    }
}
