/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.hdfs3.sink;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.hdfs3.AbstractHdfsConnector;
import org.apache.pulsar.io.hdfs3.HdfsResources;
import org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig;
import org.apache.pulsar.io.hdfs3.sink.HdfsSyncThread;

public abstract class HdfsAbstractSink<K, V>
extends AbstractHdfsConnector
implements Sink<V> {
    protected HdfsSinkConfig hdfsSinkConfig;
    protected BlockingQueue<Record<V>> unackedRecords;
    protected HdfsSyncThread<V> syncThread;
    private Path path;
    private FSDataOutputStream hdfsStream;

    public abstract KeyValue<K, V> extractKeyValue(Record<V> var1);

    protected abstract void createWriter() throws IOException;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.hdfsSinkConfig = HdfsSinkConfig.load(config);
        this.hdfsSinkConfig.validate();
        this.connectorConfig = this.hdfsSinkConfig;
        this.unackedRecords = new LinkedBlockingQueue<Record<V>>(this.hdfsSinkConfig.getMaxPendingRecords());
        this.connectToHdfs();
        this.createWriter();
        this.launchSyncThread();
    }

    public void close() throws Exception {
        this.syncThread.halt();
        this.syncThread.join(0L);
    }

    protected final void connectToHdfs() throws IOException {
        try {
            HdfsResources resources = (HdfsResources)this.hdfsResources.get();
            if (resources.getConfiguration() == null) {
                resources = this.resetHDFSResources(this.hdfsSinkConfig);
                this.hdfsResources.set(resources);
            }
        }
        catch (IOException ex) {
            this.hdfsResources.set(new HdfsResources(null, null, null));
            throw ex;
        }
    }

    protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws IOException {
        Path path = this.getPath();
        FileSystem fs = this.getFileSystemAsUser(this.getConfiguration(), this.getUserGroupInformation());
        FSDataOutputStreamBuilder builder = fs.exists(path) ? fs.appendFile(path) : fs.createFile(path);
        return builder.recursive().permission(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
    }

    protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
        if (this.hdfsStream == null) {
            this.hdfsStream = this.getOutputStreamBuilder().build();
        }
        return this.hdfsStream;
    }

    protected final Path getPath() {
        if (this.path == null) {
            String ext = "";
            if (StringUtils.isNotBlank((CharSequence)this.hdfsSinkConfig.getFileExtension())) {
                ext = this.hdfsSinkConfig.getFileExtension();
            } else if (this.getCompressionCodec() != null) {
                ext = this.getCompressionCodec().getDefaultExtension();
            }
            this.path = new Path(FilenameUtils.concat((String)this.hdfsSinkConfig.getDirectory(), (String)(this.hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext)));
        }
        return this.path;
    }

    protected final void launchSyncThread() throws IOException {
        this.syncThread = new HdfsSyncThread<V>((Syncable)this.getHdfsStream(), this.unackedRecords, this.hdfsSinkConfig.getSyncInterval());
        this.syncThread.start();
    }
}

