package org.apache.flume.sink.hdfs;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SystemClock;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.flume.sink.hdfs.HDFSEventSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flume/sink/hdfs/BucketWriter.class */
public class BucketWriter {
    static final String IN_USE_EXT = ".tmp";
    private final HDFSWriter writer;
    private final FlumeFormatter formatter;
    private final long rollInterval;
    private final long rollSize;
    private final long rollCount;
    private final long batchSize;
    private final CompressionCodec codeC;
    private final SequenceFile.CompressionType compType;
    private final Context context;
    private final ScheduledExecutorService timedRollerPool;
    private final UserGroupInformation user;
    private long eventCounter;
    private long processSize;
    private FileSystem fileSystem;
    private volatile String filePath;
    private volatile String fileSuffix;
    private volatile String bucketPath;
    private volatile long batchCounter;
    private volatile ScheduledFuture<Void> timedRollFuture;
    private SinkCounter sinkCounter;
    private final HDFSEventSink.WriterCallback onIdleCallback;
    private final int idleTimeout;
    private volatile ScheduledFuture<Void> idleFuture;
    private static final Logger LOG = LoggerFactory.getLogger(BucketWriter.class);
    private static final Integer staticLock = new Integer(1);
    private Clock clock = new SystemClock();
    protected boolean idleClosed = false;
    private final AtomicLong fileExtensionCounter = new AtomicLong(this.clock.currentTimeMillis());
    private volatile boolean isOpen = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BucketWriter(long j, long j2, long j3, long j4, Context context, String str, String str2, CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType, HDFSWriter hDFSWriter, FlumeFormatter flumeFormatter, ScheduledExecutorService scheduledExecutorService, UserGroupInformation userGroupInformation, SinkCounter sinkCounter, int i, HDFSEventSink.WriterCallback writerCallback) {
        this.rollInterval = j;
        this.rollSize = j2;
        this.rollCount = j3;
        this.batchSize = j4;
        this.context = context;
        this.filePath = str;
        this.fileSuffix = str2;
        this.codeC = compressionCodec;
        this.compType = compressionType;
        this.writer = hDFSWriter;
        this.formatter = flumeFormatter;
        this.timedRollerPool = scheduledExecutorService;
        this.user = userGroupInformation;
        this.sinkCounter = sinkCounter;
        this.onIdleCallback = writerCallback;
        this.idleTimeout = i;
        hDFSWriter.configure(context);
    }

    private <T> T runPrivileged(PrivilegedExceptionAction<T> privilegedExceptionAction) throws IOException, InterruptedException {
        if (this.user != null) {
            return (T) this.user.doAs(privilegedExceptionAction);
        }
        try {
            return privilegedExceptionAction.run();
        } catch (IOException e) {
            throw e;
        } catch (InterruptedException e2) {
            throw e2;
        } catch (RuntimeException e3) {
            throw e3;
        } catch (Exception e4) {
            throw new RuntimeException("Unexpected exception.", e4);
        }
    }

    private void resetCounters() {
        this.eventCounter = 0L;
        this.processSize = 0L;
        this.batchCounter = 0L;
    }

    private void open() throws IOException, InterruptedException {
        runPrivileged(new PrivilegedExceptionAction<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public Void run() throws Exception {
                BucketWriter.this.doOpen();
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doOpen() throws IOException {
        if (this.filePath == null || this.writer == null || this.formatter == null) {
            throw new IOException("Invalid file settings");
        }
        Configuration configuration = new Configuration();
        configuration.setBoolean("fs.automatic.close", false);
        synchronized (staticLock) {
            try {
                long incrementAndGet = this.fileExtensionCounter.incrementAndGet();
                if (this.codeC == null) {
                    this.bucketPath = this.filePath + "." + incrementAndGet;
                    if (this.fileSuffix != null && this.fileSuffix.length() > 0) {
                        this.bucketPath += this.fileSuffix;
                    }
                    this.fileSystem = new Path(this.bucketPath).getFileSystem(configuration);
                    LOG.info("Creating " + this.bucketPath + IN_USE_EXT);
                    this.writer.open(this.bucketPath + IN_USE_EXT, this.formatter);
                } else {
                    this.bucketPath = this.filePath + "." + incrementAndGet + this.codeC.getDefaultExtension();
                    this.fileSystem = new Path(this.bucketPath).getFileSystem(configuration);
                    LOG.info("Creating " + this.bucketPath + IN_USE_EXT);
                    this.writer.open(this.bucketPath + IN_USE_EXT, this.codeC, this.compType, this.formatter);
                }
            } catch (Exception e) {
                this.sinkCounter.incrementConnectionFailedCount();
                if (!(e instanceof IOException)) {
                    throw Throwables.propagate(e);
                }
                throw ((IOException) e);
            }
        }
        this.sinkCounter.incrementConnectionCreatedCount();
        resetCounters();
        if (this.rollInterval > 0) {
            this.timedRollFuture = this.timedRollerPool.schedule(new Callable<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    BucketWriter.LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", BucketWriter.this.bucketPath + BucketWriter.IN_USE_EXT, Long.valueOf(BucketWriter.this.rollInterval));
                    try {
                        BucketWriter.this.close();
                        return null;
                    } catch (Throwable th) {
                        BucketWriter.LOG.error("Unexpected error", th);
                        return null;
                    }
                }
            }, this.rollInterval, TimeUnit.SECONDS);
        }
        this.isOpen = true;
    }

    public synchronized void close() throws IOException, InterruptedException {
        flush();
        runPrivileged(new PrivilegedExceptionAction<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public Void run() throws Exception {
                BucketWriter.this.doClose();
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doClose() throws IOException {
        LOG.debug("Closing {}", this.bucketPath + IN_USE_EXT);
        if (this.isOpen) {
            try {
                this.writer.close();
                this.sinkCounter.incrementConnectionClosedCount();
            } catch (IOException e) {
                LOG.warn("failed to close() HDFSWriter for file (" + this.bucketPath + IN_USE_EXT + "). Exception follows.", e);
                this.sinkCounter.incrementConnectionFailedCount();
            }
            this.isOpen = false;
        } else {
            LOG.info("HDFSWriter is already closed: {}", this.bucketPath + IN_USE_EXT);
        }
        if (this.timedRollFuture != null && !this.timedRollFuture.isDone()) {
            this.timedRollFuture.cancel(false);
            this.timedRollFuture = null;
        }
        if (this.idleFuture != null && !this.idleFuture.isDone()) {
            this.idleFuture.cancel(false);
            this.idleFuture = null;
        }
        if (this.bucketPath == null || this.fileSystem == null) {
            return;
        }
        renameBucket();
        this.fileSystem = null;
    }

    public synchronized void flush() throws IOException, InterruptedException {
        if (isBatchComplete()) {
            return;
        }
        runPrivileged(new PrivilegedExceptionAction<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public Void run() throws Exception {
                BucketWriter.this.doFlush();
                return null;
            }
        });
        if (this.idleTimeout > 0) {
            if (this.idleFuture == null || this.idleFuture.cancel(false)) {
                this.idleFuture = this.timedRollerPool.schedule(new Callable<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.5
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        try {
                            BucketWriter.LOG.info("Closing idle bucketWriter {}", BucketWriter.this.filePath);
                            BucketWriter.this.idleClosed = true;
                            BucketWriter.this.close();
                            if (BucketWriter.this.onIdleCallback != null) {
                                BucketWriter.this.onIdleCallback.run(BucketWriter.this.filePath);
                            }
                            return null;
                        } catch (Throwable th) {
                            BucketWriter.LOG.error("Unexpected error", th);
                            return null;
                        }
                    }
                }, this.idleTimeout, TimeUnit.SECONDS);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doFlush() throws IOException {
        this.writer.sync();
        this.batchCounter = 0L;
    }

    public synchronized void append(Event event) throws IOException, InterruptedException {
        if (!this.isOpen) {
            if (this.idleClosed) {
                throw new IOException("This bucket writer was closed due to idling and this handle is thus no longer valid");
            }
            open();
        }
        if (shouldRotate()) {
            close();
            open();
        }
        try {
            this.sinkCounter.incrementEventDrainAttemptCount();
            this.writer.append(event, this.formatter);
            this.processSize += event.getBody().length;
            this.eventCounter++;
            this.batchCounter++;
            if (this.batchCounter == this.batchSize) {
                flush();
            }
        } catch (IOException e) {
            LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + this.bucketPath + IN_USE_EXT + ") and rethrowing exception.", e.getMessage());
            try {
                close();
            } catch (IOException e2) {
                LOG.warn("Caught IOException while closing file (" + this.bucketPath + IN_USE_EXT + "). Exception follows.", e2);
            }
            throw e;
        }
    }

    private boolean shouldRotate() {
        boolean z = false;
        if (this.rollCount > 0 && this.rollCount <= this.eventCounter) {
            LOG.debug("rolling: rollCount: {}, events: {}", Long.valueOf(this.rollCount), Long.valueOf(this.eventCounter));
            z = true;
        }
        if (this.rollSize > 0 && this.rollSize <= this.processSize) {
            LOG.debug("rolling: rollSize: {}, bytes: {}", Long.valueOf(this.rollSize), Long.valueOf(this.processSize));
            z = true;
        }
        return z;
    }

    private void renameBucket() throws IOException {
        Path path = new Path(this.bucketPath + IN_USE_EXT);
        Path path2 = new Path(this.bucketPath);
        if (this.fileSystem.exists(path)) {
            LOG.info("Renaming " + path + " to " + path2);
            this.fileSystem.rename(path, path2);
        }
    }

    public String toString() {
        return "[ " + getClass().getSimpleName() + " filePath = " + this.filePath + ", bucketPath = " + this.bucketPath + " ]";
    }

    private boolean isBatchComplete() {
        return this.batchCounter == 0;
    }

    void setClock(Clock clock) {
        this.clock = clock;
    }
}
