/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.PathManager;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RollingFileSink
extends AbstractSink
implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(RollingFileSink.class);
    private static final long defaultRollInterval = 30L;
    private static final int defaultBatchSize = 100;
    private int batchSize = 100;
    private File directory;
    private long rollInterval;
    private OutputStream outputStream;
    private ScheduledExecutorService rollService;
    private String serializerType;
    private Context serializerContext;
    private EventSerializer serializer;
    private SinkCounter sinkCounter;
    private PathManager pathController = new PathManager();
    private volatile boolean shouldRotate = false;

    @Override
    public void configure(Context context) {
        String directory = context.getString("sink.directory");
        String rollInterval = context.getString("sink.rollInterval");
        this.serializerType = context.getString("sink.serializer", "TEXT");
        this.serializerContext = new Context(context.getSubProperties("sink.serializer."));
        Preconditions.checkArgument(directory != null, "Directory may not be null");
        Preconditions.checkNotNull(this.serializerType, "Serializer type is undefined");
        this.rollInterval = rollInterval == null ? 30L : Long.parseLong(rollInterval);
        this.batchSize = context.getInteger("sink.batchSize", 100);
        this.directory = new File(directory);
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
    }

    @Override
    public void start() {
        logger.info("Starting {}...", (Object)this);
        this.sinkCounter.start();
        super.start();
        this.pathController.setBaseDirectory(this.directory);
        if (this.rollInterval > 0L) {
            this.rollService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d").build());
            this.rollService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    logger.debug("Marking time to rotate file {}", (Object)RollingFileSink.this.pathController.getCurrentFile());
                    RollingFileSink.this.shouldRotate = true;
                }
            }, this.rollInterval, this.rollInterval, TimeUnit.SECONDS);
        } else {
            logger.info("RollInterval is not valid, file rolling will not happen.");
        }
        logger.info("RollingFileSink {} started.", (Object)this.getName());
    }

    @Override
    public Sink.Status process() throws EventDeliveryException {
        if (this.shouldRotate) {
            logger.debug("Time to rotate {}", (Object)this.pathController.getCurrentFile());
            if (this.outputStream != null) {
                logger.debug("Closing file {}", (Object)this.pathController.getCurrentFile());
                try {
                    this.serializer.flush();
                    this.serializer.beforeClose();
                    this.outputStream.close();
                    this.sinkCounter.incrementConnectionClosedCount();
                    this.shouldRotate = false;
                }
                catch (IOException e) {
                    this.sinkCounter.incrementConnectionFailedCount();
                    throw new EventDeliveryException("Unable to rotate file " + this.pathController.getCurrentFile() + " while delivering event", e);
                }
                finally {
                    this.serializer = null;
                    this.outputStream = null;
                }
                this.pathController.rotate();
            }
        }
        if (this.outputStream == null) {
            File currentFile = this.pathController.getCurrentFile();
            logger.debug("Opening output stream for file {}", (Object)currentFile);
            try {
                this.outputStream = new BufferedOutputStream(new FileOutputStream(currentFile));
                this.serializer = EventSerializerFactory.getInstance(this.serializerType, this.serializerContext, this.outputStream);
                this.serializer.afterCreate();
                this.sinkCounter.incrementConnectionCreatedCount();
            }
            catch (IOException e) {
                this.sinkCounter.incrementConnectionFailedCount();
                throw new EventDeliveryException("Failed to open file " + this.pathController.getCurrentFile() + " while delivering event", e);
            }
        }
        Channel channel = this.getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        Sink.Status result = Sink.Status.READY;
        try {
            transaction.begin();
            int eventAttemptCounter = 0;
            for (int i = 0; i < this.batchSize; ++i) {
                event = channel.take();
                if (event != null) {
                    this.sinkCounter.incrementEventDrainAttemptCount();
                    ++eventAttemptCounter;
                } else {
                    result = Sink.Status.BACKOFF;
                    break;
                }
                this.serializer.write(event);
            }
            this.serializer.flush();
            this.outputStream.flush();
            transaction.commit();
            this.sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
        }
        catch (Exception ex) {
            transaction.rollback();
            throw new EventDeliveryException("Failed to process transaction", ex);
        }
        finally {
            transaction.close();
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        logger.info("RollingFile sink {} stopping...", (Object)this.getName());
        this.sinkCounter.stop();
        super.stop();
        if (this.outputStream != null) {
            logger.debug("Closing file {}", (Object)this.pathController.getCurrentFile());
            try {
                this.serializer.flush();
                this.serializer.beforeClose();
                this.outputStream.close();
                this.sinkCounter.incrementConnectionClosedCount();
            }
            catch (IOException e) {
                this.sinkCounter.incrementConnectionFailedCount();
                logger.error("Unable to close output stream. Exception follows.", (Throwable)e);
            }
            finally {
                this.outputStream = null;
                this.serializer = null;
            }
        }
        if (this.rollInterval > 0L) {
            this.rollService.shutdown();
            while (!this.rollService.isTerminated()) {
                try {
                    this.rollService.awaitTermination(1L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    logger.debug("Interrupted while waiting for roll service to stop. Please report this.", (Throwable)e);
                }
            }
        }
        logger.info("RollingFile sink {} stopped. Event metrics: {}", (Object)this.getName(), (Object)this.sinkCounter);
    }

    public File getDirectory() {
        return this.directory;
    }

    public void setDirectory(File directory) {
        this.directory = directory;
    }

    public long getRollInterval() {
        return this.rollInterval;
    }

    public void setRollInterval(long rollInterval) {
        this.rollInterval = rollInterval;
    }
}

