package co.cask.cdap.logging.write;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/write/AvroFileWriter.class */
public final class AvroFileWriter implements Closeable, Flushable {
    private static final Logger LOG = LoggerFactory.getLogger(AvroFileWriter.class);
    private final FileMetaDataManager fileMetaDataManager;
    private final CConfiguration cConf;
    private final Location rootDir;
    private final String logBaseDir;
    private final Schema schema;
    private final int syncIntervalBytes;
    private final long maxFileSize;
    private final long inactiveIntervalMs;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<String, AvroFile> fileMap = Maps.newHashMap();

    /* loaded from: input_file:co/cask/cdap/logging/write/AvroFileWriter$AvroFile.class */
    public class AvroFile implements Closeable {
        private final Location location;
        private FSDataOutputStream outputStream;
        private DataFileWriter<GenericRecord> dataFileWriter;
        private long lastModifiedTs;
        private boolean isOpen = false;

        public AvroFile(Location location) {
            this.location = location;
        }

        void open() throws IOException {
            this.outputStream = new FSDataOutputStream(this.location.getOutputStream(), (FileSystem.Statistics) null);
            this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(AvroFileWriter.this.schema));
            this.dataFileWriter.create(AvroFileWriter.this.schema, this.outputStream);
            this.dataFileWriter.setSyncInterval(AvroFileWriter.this.syncIntervalBytes);
            this.lastModifiedTs = System.currentTimeMillis();
            this.isOpen = true;
        }

        public Location getLocation() {
            return this.location;
        }

        public void append(LogWriteEvent logWriteEvent) throws IOException {
            this.dataFileWriter.append(logWriteEvent.getGenericRecord());
            this.lastModifiedTs = System.currentTimeMillis();
        }

        public long getPos() throws IOException {
            return this.outputStream.getPos();
        }

        public long getLastModifiedTs() {
            return this.lastModifiedTs;
        }

        public void flush() throws IOException {
            this.dataFileWriter.flush();
            this.outputStream.hflush();
        }

        public void sync() throws IOException {
            this.dataFileWriter.flush();
            this.outputStream.hsync();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.isOpen) {
                try {
                    if (this.dataFileWriter != null) {
                        this.dataFileWriter.close();
                    }
                    this.isOpen = false;
                } finally {
                    if (this.outputStream != null) {
                        this.outputStream.close();
                    }
                }
            }
        }
    }

    public AvroFileWriter(FileMetaDataManager fileMetaDataManager, CConfiguration cConfiguration, Location location, String str, Schema schema, long j, int i, long j2) {
        this.fileMetaDataManager = fileMetaDataManager;
        this.cConf = cConfiguration;
        this.rootDir = location;
        this.logBaseDir = str;
        this.schema = schema;
        this.syncIntervalBytes = i;
        this.maxFileSize = j;
        this.inactiveIntervalMs = j2;
    }

    public void append(List<? extends LogWriteEvent> list) throws Exception {
        if (list.isEmpty()) {
            LOG.debug("Empty append list.");
            return;
        }
        LogWriteEvent logWriteEvent = list.get(0);
        LoggingContext loggingContext = logWriteEvent.getLoggingContext();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Appending {} messages for logging context {}", Integer.valueOf(list.size()), loggingContext.getLogPathFragment(this.logBaseDir));
        }
        long timeStamp = logWriteEvent.getLogEvent().getTimeStamp();
        AvroFile rotateFile = rotateFile(getAvroFile(loggingContext, timeStamp), loggingContext, timeStamp);
        Iterator<? extends LogWriteEvent> it = list.iterator();
        while (it.hasNext()) {
            rotateFile.append(it.next());
        }
        rotateFile.flush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            try {
                flush();
            } catch (Exception e) {
                LOG.error("Caught exception while checkpointing", e);
            }
            LOG.info("Closing all files");
            for (Map.Entry<String, AvroFile> entry : this.fileMap.entrySet()) {
                try {
                    entry.getValue().close();
                } catch (Throwable th) {
                    LOG.error(String.format("Caught exception while closing file %s", entry.getValue().getLocation().toURI()), th);
                }
            }
            this.fileMap.clear();
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<String, AvroFile>> it = this.fileMap.entrySet().iterator();
        while (it.hasNext()) {
            AvroFile value = it.next().getValue();
            value.sync();
            if (currentTimeMillis - value.getLastModifiedTs() > this.inactiveIntervalMs) {
                value.close();
                it.remove();
            }
        }
    }

    private AvroFile getAvroFile(LoggingContext loggingContext, long j) throws Exception {
        AvroFile avroFile = this.fileMap.get(loggingContext.getLogPathFragment(this.logBaseDir));
        if (avroFile == null) {
            avroFile = createAvroFile(loggingContext, j);
        }
        return avroFile;
    }

    private AvroFile createAvroFile(LoggingContext loggingContext, long j) throws Exception {
        Location createLocation = createLocation(loggingContext.getLogPathFragment(this.logBaseDir), System.currentTimeMillis());
        LOG.info(String.format("Creating Avro file %s", createLocation.toURI()));
        AvroFile avroFile = new AvroFile(createLocation);
        try {
            avroFile.open();
            try {
                this.fileMetaDataManager.writeMetaData(loggingContext, j, createLocation);
                this.fileMap.put(loggingContext.getLogPathFragment(this.logBaseDir), avroFile);
                return avroFile;
            } catch (Throwable th) {
                closeAndDelete(avroFile);
                throw new IOException(th);
            }
        } catch (IOException e) {
            closeAndDelete(avroFile);
            throw e;
        }
    }

    private Location createLocation(String str, long j) throws IOException {
        return this.rootDir.append(this.cConf.get("namespaces.dir")).append(str).append(new SimpleDateFormat("yyyy-MM-dd").format(new Date())).append(String.format("%s.avro", Long.valueOf(j)));
    }

    private AvroFile rotateFile(AvroFile avroFile, LoggingContext loggingContext, long j) throws Exception {
        if (avroFile.getPos() <= this.maxFileSize) {
            return avroFile;
        }
        LOG.info(String.format("Rotating file %s", avroFile.getLocation().toURI()));
        flush();
        avroFile.close();
        return createAvroFile(loggingContext, j);
    }

    private void closeAndDelete(AvroFile avroFile) {
        try {
            avroFile.close();
            if (avroFile.getLocation().exists()) {
                avroFile.getLocation().delete();
            }
        } catch (IOException e) {
            LOG.error("Error while closing and deleting file {}", avroFile.getLocation(), e);
        }
    }
}
