package co.cask.cdap.report.main;

import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.report.util.Constants;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/report/main/RunMetaFileManager.class */
public class RunMetaFileManager {
    private static final String SYNC_INTERVAL = "file.sync.interval.bytes";
    private static final String MAX_FILE_SIZE_BYTES = "file.max.size.bytes";
    private static final String MAX_FILE_OPEN_DURATION_MILLIS = "file.max.open.duration.millis";
    private final int syncIntervalBytes;
    private final int maxFileSizeBytes;
    private final long maxFileOpenDurationMillis;
    private final Metrics metrics;
    private Map<String, RunMetaFileOutputStream> namespaceToLogFileStreamMap = new HashMap();
    private Location baseLocation;
    private long lastSyncTime;
    private static final Logger LOG = LoggerFactory.getLogger(RunMetaFileManager.class);
    private static final Integer DEFAULT_MAX_FILE_SIZE_BYTES = 67108864;
    private static final Integer DEFAULT_SYNC_INTERVAL_BYTES = 10485760;
    private static final Long DEFAULT_MAX_FILE_OPEN_DURATION = Long.valueOf(TimeUnit.HOURS.toMillis(6));
    private static final long SYNC_INTERVAL_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/report/main/RunMetaFileManager$Callable.class */
    public interface Callable {
        void call() throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RunMetaFileManager(Location location, Map<String, String> map, Metrics metrics) {
        this.baseLocation = location;
        this.syncIntervalBytes = map.containsKey(SYNC_INTERVAL) ? Integer.parseInt(map.get(SYNC_INTERVAL)) : DEFAULT_SYNC_INTERVAL_BYTES.intValue();
        this.maxFileSizeBytes = map.containsKey(MAX_FILE_SIZE_BYTES) ? Integer.parseInt(map.get(MAX_FILE_SIZE_BYTES)) : DEFAULT_MAX_FILE_SIZE_BYTES.intValue();
        this.maxFileOpenDurationMillis = map.containsKey(MAX_FILE_OPEN_DURATION_MILLIS) ? Integer.parseInt(map.get(MAX_FILE_OPEN_DURATION_MILLIS)) : DEFAULT_MAX_FILE_OPEN_DURATION.longValue();
        this.lastSyncTime = System.currentTimeMillis();
        this.metrics = metrics;
    }

    public void append(ProgramRunInfo programRunInfo) throws InterruptedException {
        if (!this.namespaceToLogFileStreamMap.containsKey(programRunInfo.getNamespace())) {
            createLogFileOutputStreamWithRetry(programRunInfo.getNamespace(), programRunInfo.getTimestamp());
        }
        rotateOutputStreamIfNeeded(this.namespaceToLogFileStreamMap.get(programRunInfo.getNamespace()), programRunInfo.getNamespace(), programRunInfo.getTimestamp());
        appendAndFlushWithRetry(this.namespaceToLogFileStreamMap.get(programRunInfo.getNamespace()), programRunInfo);
        syncOutputStreamsIfRequired();
    }

    public void syncOutputStreamsIfRequired() throws InterruptedException {
        if (System.currentTimeMillis() - this.lastSyncTime > SYNC_INTERVAL_TIME_MILLIS) {
            for (RunMetaFileOutputStream runMetaFileOutputStream : this.namespaceToLogFileStreamMap.values()) {
                retryWithCallable(() -> {
                    runMetaFileOutputStream.sync();
                }, "sync");
            }
            this.lastSyncTime = System.currentTimeMillis();
            LOG.debug("Last sync time {}", Long.valueOf(this.lastSyncTime));
            this.metrics.gauge(Constants.Metrics.SYNC_INTERVAL_TIME_MILLIS_METRIC, this.lastSyncTime);
        }
    }

    public void cleanup() {
        Iterator<RunMetaFileOutputStream> it = this.namespaceToLogFileStreamMap.values().iterator();
        while (it.hasNext()) {
            Closeables.closeQuietly(it.next());
        }
    }

    private void createLogFileOutputStreamWithRetry(String str, Long l) throws InterruptedException {
        while (!createLogFileOutputStream(str, l)) {
            LOG.warn("Failed to create log file for the namespace {} and timestamp {}", str, l);
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        LOG.info("Successfully created log file for the namespace {} and timestamp {}", str, l);
    }

    private boolean createLogFileOutputStream(String str, Long l) {
        try {
            Location append = getOrCreateAndGet(str).append(String.format("%s-%s.avro", l, Long.valueOf(System.currentTimeMillis())));
            boolean createNew = append.createNew();
            if (createNew) {
                this.namespaceToLogFileStreamMap.put(str, new RunMetaFileOutputStream(append, "", this.syncIntervalBytes, System.currentTimeMillis(), () -> {
                    this.namespaceToLogFileStreamMap.remove(str);
                }));
            }
            return createNew;
        } catch (IOException e) {
            LOG.warn("Exception while trying to create file location ", e);
            return false;
        }
    }

    private void rotateOutputStreamIfNeeded(RunMetaFileOutputStream runMetaFileOutputStream, String str, Long l) throws InterruptedException {
        boolean z = System.currentTimeMillis() - runMetaFileOutputStream.getCreateTime() > this.maxFileOpenDurationMillis;
        if (runMetaFileOutputStream.getSize() > this.maxFileSizeBytes || z) {
            Closeables.closeQuietly(runMetaFileOutputStream);
            createLogFileOutputStreamWithRetry(str, l);
        }
    }

    private Location getOrCreateAndGet(String str) throws IOException {
        for (Location location : this.baseLocation.list()) {
            if (location.getName().equals(str)) {
                return location;
            }
        }
        Location append = this.baseLocation.append(str);
        append.mkdirs();
        return append;
    }

    private void appendAndFlushWithRetry(RunMetaFileOutputStream runMetaFileOutputStream, ProgramRunInfo programRunInfo) throws InterruptedException {
        retryWithCallable(() -> {
            runMetaFileOutputStream.append(programRunInfo);
        }, "append");
        retryWithCallable(() -> {
            runMetaFileOutputStream.flush();
        }, "flush");
    }

    private void retryWithCallable(Callable callable, String str) throws InterruptedException {
        SampledLogging sampledLogging = new SampledLogging(LOG, 100);
        boolean z = false;
        while (!z) {
            try {
                callable.call();
                z = true;
            } catch (IOException e) {
                sampledLogging.logWarning(String.format("Exception while performing %s : ", str), e);
                TimeUnit.MILLISECONDS.sleep(10L);
            }
        }
    }
}
