package co.cask.cdap.logging.write;

import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.security.Impersonator;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/write/AvroFileWriter.class */
public final class AvroFileWriter implements Closeable, Flushable {
    private static final Logger LOG = LoggerFactory.getLogger(AvroFileWriter.class);
    private final FileMetaDataManager fileMetaDataManager;
    private final NamespacedLocationFactory namespacedLocationFactory;
    private final String logBaseDir;
    private final Schema schema;
    private final int syncIntervalBytes;
    private final long maxFileSize;
    private final long maxFileLifetimeMs;
    private final Impersonator impersonator;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<String, AvroFile> fileMap = Maps.newHashMap();

    /* loaded from: input_file:co/cask/cdap/logging/write/AvroFileWriter$AvroFile.class */
    public class AvroFile implements Closeable {
        private final Location location;
        private FSDataOutputStream outputStream;
        private DataFileWriter<GenericRecord> dataFileWriter;
        private long createTime;
        private boolean isOpen = false;

        public AvroFile(Location location) {
            this.location = location;
        }

        void open() throws IOException {
            try {
                this.outputStream = new FSDataOutputStream(this.location.getOutputStream(), (FileSystem.Statistics) null);
                this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(AvroFileWriter.this.schema));
                this.dataFileWriter.create(AvroFileWriter.this.schema, this.outputStream);
                this.dataFileWriter.setSyncInterval(AvroFileWriter.this.syncIntervalBytes);
                this.createTime = System.currentTimeMillis();
                sync();
                this.isOpen = true;
            } catch (Exception e) {
                close();
                throw new IOException("Exception while creating file " + this.location, e);
            }
        }

        public boolean isOpen() {
            return this.isOpen;
        }

        public Location getLocation() {
            return this.location;
        }

        public void append(LogWriteEvent logWriteEvent) throws IOException {
            try {
                this.dataFileWriter.append(logWriteEvent.getGenericRecord());
            } catch (Exception e) {
                close();
                throw new IOException("Exception while appending to file " + this.location, e);
            }
        }

        public long getPos() throws IOException {
            try {
                return this.outputStream.getPos();
            } catch (Exception e) {
                close();
                throw new IOException("Exception while getting position of file " + this.location, e);
            }
        }

        public long getCreateTime() {
            return this.createTime;
        }

        public void flush() throws IOException {
            try {
                this.dataFileWriter.flush();
                this.outputStream.hflush();
            } catch (Exception e) {
                close();
                throw new IOException("Exception while flushing file {}" + this.location, e);
            }
        }

        public void sync() throws IOException {
            try {
                this.dataFileWriter.flush();
                this.outputStream.hsync();
            } catch (Exception e) {
                close();
                throw new IOException("Exception while syncing file {}" + this.location, e);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.isOpen) {
                AvroFileWriter.LOG.trace("Closing file {}", this.location);
                this.isOpen = false;
                try {
                    if (this.dataFileWriter != null) {
                        this.dataFileWriter.close();
                    }
                } finally {
                    if (this.outputStream != null) {
                        this.outputStream.close();
                    }
                }
            }
        }
    }

    public AvroFileWriter(FileMetaDataManager fileMetaDataManager, NamespacedLocationFactory namespacedLocationFactory, String str, Schema schema, long j, int i, long j2, Impersonator impersonator) {
        this.fileMetaDataManager = fileMetaDataManager;
        this.namespacedLocationFactory = namespacedLocationFactory;
        this.logBaseDir = str;
        this.schema = schema;
        this.syncIntervalBytes = i;
        this.maxFileSize = j;
        this.maxFileLifetimeMs = j2;
        this.impersonator = impersonator;
    }

    public void append(List<? extends LogWriteEvent> list) throws Exception {
        if (list.isEmpty()) {
            LOG.debug("Empty append list.");
            return;
        }
        LogWriteEvent logWriteEvent = list.get(0);
        LoggingContext loggingContext = logWriteEvent.getLoggingContext();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Appending {} messages for logging context {}", Integer.valueOf(list.size()), loggingContext.getLogPathFragment(this.logBaseDir));
        }
        long timeStamp = logWriteEvent.getLogEvent().getTimeStamp();
        AvroFile rotateFile = rotateFile(getAvroFile(loggingContext, timeStamp), loggingContext, timeStamp);
        Iterator<? extends LogWriteEvent> it = list.iterator();
        while (it.hasNext()) {
            rotateFile.append(it.next());
        }
        rotateFile.flush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            try {
                flush();
            } catch (Exception e) {
                LOG.error("Caught exception while checkpointing", e);
            }
            LOG.debug("Closing all files");
            for (Map.Entry<String, AvroFile> entry : this.fileMap.entrySet()) {
                try {
                    entry.getValue().close();
                } catch (Throwable th) {
                    LOG.error("Caught exception while closing file {}", entry.getValue().getLocation(), th);
                }
            }
            this.fileMap.clear();
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<String, AvroFile>> it = this.fileMap.entrySet().iterator();
        while (it.hasNext()) {
            AvroFile value = it.next().getValue();
            if (value.isOpen()) {
                value.sync();
                if (currentTimeMillis - value.getCreateTime() > this.maxFileLifetimeMs) {
                    value.close();
                    it.remove();
                }
            } else {
                it.remove();
            }
        }
    }

    private AvroFile getAvroFile(LoggingContext loggingContext, long j) throws Exception {
        AvroFile avroFile = this.fileMap.get(loggingContext.getLogPathFragment(this.logBaseDir));
        if (avroFile != null && !avroFile.isOpen()) {
            avroFile = null;
        }
        if (avroFile == null) {
            avroFile = createAvroFile(loggingContext, j);
        }
        return avroFile;
    }

    private AvroFile createAvroFile(LoggingContext loggingContext, long j) throws IOException {
        Location createLocation = createLocation(loggingContext, System.currentTimeMillis());
        LOG.info("Creating Avro file {}", createLocation);
        AvroFile avroFile = new AvroFile(createLocation);
        try {
            avroFile.open();
            try {
                this.fileMetaDataManager.writeMetaData(loggingContext, j, createLocation);
                this.fileMap.put(loggingContext.getLogPathFragment(this.logBaseDir), avroFile);
                return avroFile;
            } catch (Throwable th) {
                closeAndDelete(avroFile);
                throw new IOException(th);
            }
        } catch (IOException e) {
            closeAndDelete(avroFile);
            throw e;
        }
    }

    private Location createLocation(LoggingContext loggingContext, long j) throws IOException {
        String format = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
        String format2 = String.format("%s.avro", Long.valueOf(j));
        final NamespaceId namespaceId = LoggingContextHelper.getNamespaceId(loggingContext);
        try {
            return ((Location) this.impersonator.doAs(namespaceId, new Callable<Location>() { // from class: co.cask.cdap.logging.write.AvroFileWriter.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Location call() throws Exception {
                    return AvroFileWriter.this.namespacedLocationFactory.get(namespaceId.toId());
                }
            })).append(loggingContext.getLogPathFragment(this.logBaseDir).split(namespaceId.getNamespace())[1]).append(format).append(format2);
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            Throwables.propagateIfPossible(e2);
            LOG.warn("Unexpected exception while getting namespace location for namespace {}.", namespaceId, e2);
            throw Throwables.propagate(e2);
        }
    }

    private AvroFile rotateFile(AvroFile avroFile, LoggingContext loggingContext, long j) throws Exception {
        if (!avroFile.isOpen()) {
            LOG.info("Rotating a closed file {}", avroFile.getLocation());
            return createAvroFile(loggingContext, j);
        }
        if (avroFile.getPos() <= this.maxFileSize) {
            return avroFile;
        }
        LOG.info("Rotating file {}", avroFile.getLocation());
        avroFile.close();
        return createAvroFile(loggingContext, j);
    }

    private void closeAndDelete(AvroFile avroFile) {
        try {
            try {
                avroFile.close();
                if (avroFile.getLocation().exists()) {
                    avroFile.getLocation().delete();
                }
            } catch (Throwable th) {
                if (avroFile.getLocation().exists()) {
                    avroFile.getLocation().delete();
                }
                throw th;
            }
        } catch (IOException e) {
            LOG.error("Error while closing and deleting file {}", avroFile.getLocation(), e);
        }
    }
}
