/*
 * Decompiled with CFR 0.152.
 */
package de.codepfleger.flume.parquet.sink;

import de.codepfleger.flume.parquet.serializer.ParquetSerializer;
import de.codepfleger.flume.parquet.sink.SerializerLinkedHashMap;
import de.codepfleger.flume.parquet.sink.SerializerMapEntry;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericData;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HDFSParquetSink
extends AbstractSink
implements Configurable {
    public static final String EVENTS_PER_TRANSACTION_KEY = "eventsPerTransaction";
    public static final String SCHEMA_KEY = "schema";
    public static final String FILE_PATH_KEY = "filePath";
    public static final String FILE_NAME_KEY = "fileName";
    public static final String FILE_SIZE_KEY = "fileSize";
    public static final String FILE_COMPRESSION_KEY = "fileCompression";
    public static final String FILE_QUEUE_SIZE_KEY = "fileQueueSize";
    public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds";
    private static final Logger LOG = LoggerFactory.getLogger(HDFSParquetSink.class);
    private final Object lock = new Object();
    private final Random random = new Random();
    private static final AtomicBoolean processingEnabled = new AtomicBoolean(false);
    private SerializerLinkedHashMap serializers;
    private Configuration configuration;
    private CompressionCodecName compressionCodec;
    private int eventsPerTransaction;
    private int timeoutSeconds;
    private String fileName;
    private String filePath;
    private Integer uncompressedFileSize;
    private String serializerType;
    private Context serializerContext;

    public synchronized void start() {
        super.start();
        final HDFSParquetSink sink = this;
        ShutdownHookManager.get().addShutdownHook(new Runnable(){

            @Override
            public void run() {
                sink.stop();
            }
        }, Integer.MAX_VALUE);
        processingEnabled.getAndSet(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void stop() {
        processingEnabled.getAndSet(false);
        Object object = this.lock;
        synchronized (object) {
            if (this.serializers != null && !this.serializers.isEmpty()) {
                for (SerializerMapEntry serializer : this.serializers.values()) {
                    try {
                        serializer.close();
                    }
                    catch (IOException e) {
                        LOG.error(e.getMessage(), (Throwable)e);
                    }
                }
                this.serializers.clear();
            }
        }
        super.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sink.Status process() throws EventDeliveryException {
        if (processingEnabled.get()) {
            Channel ch = this.getChannel();
            txn.begin();
            try (Transaction txn = ch.getTransaction();){
                for (int i = 0; i < this.eventsPerTransaction; ++i) {
                    Event event = ch.take();
                    if (event == null) continue;
                    this.getSerializer(event).write(event);
                }
                txn.commit();
                Sink.Status i = Sink.Status.READY;
                return i;
            }
        }
        return Sink.Status.READY;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private EventSerializer getSerializer(Event event) throws IOException {
        String replacedPath = this.replaceWildcards(this.filePath, event);
        String replacedName = this.replaceWildcards(this.fileName, event);
        String path = replacedPath + replacedName;
        Object object = this.lock;
        synchronized (object) {
            SerializerMapEntry eventSerializer = (SerializerMapEntry)this.serializers.get(path);
            if (this.isSerializerInvalid(eventSerializer)) {
                eventSerializer.close();
                this.serializers.remove(path);
                eventSerializer = null;
            }
            if (eventSerializer == null) {
                eventSerializer = this.createSerializer(replacedPath, replacedName);
                this.serializers.put(path, eventSerializer);
            }
            return eventSerializer.getSerializer();
        }
    }

    private boolean isSerializerInvalid(SerializerMapEntry eventSerializer) {
        if (eventSerializer != null) {
            long serializerTimeout;
            if (eventSerializer.getSerializer().getWriter().getDataSize() > (long)this.uncompressedFileSize.intValue()) {
                return true;
            }
            long time = new Date().getTime();
            if (time > (serializerTimeout = eventSerializer.getStartTime() + (long)(this.timeoutSeconds * 1000))) {
                return true;
            }
        }
        return false;
    }

    private SerializerMapEntry createSerializer(String replacedPath, String replacedName) throws IOException {
        ParquetSerializer eventSerializer = (ParquetSerializer)EventSerializerFactory.getInstance((String)this.serializerType, (Context)this.serializerContext, null);
        String actualFileName = this.replaceRandomSalt(replacedName);
        String workingFilePath = replacedPath + "_" + actualFileName;
        String targetFilePath = replacedPath + actualFileName;
        Path working = new Path(workingFilePath);
        working.getFileSystem(this.configuration);
        ParquetWriter writer = ((AvroParquetWriter.Builder)AvroParquetWriter.builder((Path)working).withSchema(eventSerializer.getSchema()).withCompressionCodec(this.compressionCodec)).build();
        eventSerializer.initialize((ParquetWriter<GenericData.Record>)writer);
        return new SerializerMapEntry(working, this.configuration, targetFilePath, eventSerializer);
    }

    private String replaceRandomSalt(String fileName) {
        int nextInt = Math.abs(this.random.nextInt());
        fileName = fileName.contains("%[n]") ? fileName.replace("%[n]", "" + nextInt) : fileName + "." + nextInt;
        return fileName;
    }

    private String replaceWildcards(String value, Event event) {
        return BucketPath.escapeString((String)value, (Map)event.getHeaders(), null, (boolean)false, (int)0, (int)1, (boolean)true);
    }

    public void configure(Context context) {
        this.filePath = context.getString(FILE_PATH_KEY);
        if (this.filePath == null) {
            throw new IllegalStateException("filePath missing");
        }
        this.fileName = context.getString(FILE_NAME_KEY);
        if (this.fileName == null) {
            throw new IllegalStateException("fileName missing");
        }
        this.serializerType = context.getString("serializer");
        if (this.serializerType == null) {
            throw new IllegalStateException("serializer missing");
        }
        this.compressionCodec = CompressionCodecName.fromConf((String)context.getString(FILE_COMPRESSION_KEY, CompressionCodecName.SNAPPY.name()));
        this.eventsPerTransaction = context.getInteger(EVENTS_PER_TRANSACTION_KEY, Integer.valueOf(10));
        this.uncompressedFileSize = context.getInteger(FILE_SIZE_KEY, Integer.valueOf(500000));
        this.timeoutSeconds = context.getInteger(TIMEOUT_SECONDS_KEY, Integer.valueOf(3600));
        this.serializers = new SerializerLinkedHashMap(context.getInteger(FILE_QUEUE_SIZE_KEY, Integer.valueOf(2)));
        this.serializerContext = new Context((Map)context.getSubProperties("serializer."));
        this.configuration = new Configuration();
        this.configuration.setBoolean("fs.automatic.close", false);
    }
}

