/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.hdfs3;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pulsar.io.hdfs3.AbstractHdfsConfig;
import org.apache.pulsar.io.hdfs3.HdfsResources;
import org.apache.pulsar.io.hdfs3.SecurityUtil;
import org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig;

public abstract class AbstractHdfsConnector {
    private static final Object RESOURCES_LOCK = new Object();
    protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference();
    protected AbstractHdfsConfig connectorConfig;
    protected CompressionCodecFactory compressionCodecFactory;

    public AbstractHdfsConnector() {
        this.hdfsResources.set(new HdfsResources(null, null, null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
        FileSystem fs;
        UserGroupInformation ugi;
        ExtendedConfiguration config = new ExtendedConfiguration();
        config.setClassLoader(Thread.currentThread().getContextClassLoader());
        AbstractHdfsConnector.getConfig(config, this.connectorConfig.getHdfsConfigResources());
        this.checkHdfsUriForTimeout(config);
        String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri((Configuration)config).getScheme());
        config.set(disableCacheName, "true");
        Object object = RESOURCES_LOCK;
        synchronized (object) {
            if (SecurityUtil.isSecurityEnabled(config)) {
                ugi = SecurityUtil.loginKerberos(config, this.connectorConfig.getKerberosUserPrincipal(), this.connectorConfig.getKeytab());
                fs = this.getFileSystemAsUser(config, ugi);
            } else {
                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                config.set("hadoop.security.authentication", "simple");
                ugi = SecurityUtil.loginSimple(config);
                fs = this.getFileSystemAsUser(config, ugi);
            }
        }
        return new HdfsResources(config, fs, ugi);
    }

    private static Configuration getConfig(Configuration config, String res) throws IOException {
        boolean foundResources = false;
        if (null != res) {
            String[] resources;
            for (String resource : resources = res.split(",")) {
                config.addResource(new Path(resource.trim()));
                foundResources = true;
            }
        }
        if (!foundResources) {
            String configStr = config.toString();
            for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
                if (resource.contains("default") || config.getResource(resource.trim()) == null) continue;
                foundResources = true;
                break;
            }
        }
        if (!foundResources) {
            throw new IOException("Could not find any of the " + res + " on the classpath");
        }
        return config;
    }

    protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
        URI hdfsUri = FileSystem.getDefaultUri((Configuration)config);
        String address = hdfsUri.getAuthority();
        int port = hdfsUri.getPort();
        if (address == null || address.isEmpty() || port < 0) {
            return;
        }
        InetSocketAddress namenode = NetUtils.createSocketAddr((String)address, (int)port);
        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory((Configuration)config);
        try (Socket socket = socketFactory.createSocket();){
            NetUtils.connect((Socket)socket, (SocketAddress)namenode, (int)1000);
        }
    }

    protected FileSystem getFileSystem(Configuration config) throws IOException {
        return FileSystem.get((Configuration)config);
    }

    protected FileSystem getFileSystemAsUser(Configuration config, UserGroupInformation ugi) throws IOException {
        try {
            return (FileSystem)ugi.doAs(() -> FileSystem.get((Configuration)config));
        }
        catch (InterruptedException e) {
            throw new IOException("Unable to create file system: " + e.getMessage());
        }
    }

    protected Configuration getConfiguration() {
        return this.hdfsResources.get().getConfiguration();
    }

    protected FileSystem getFileSystem() {
        return this.hdfsResources.get().getFileSystem();
    }

    protected UserGroupInformation getUserGroupInformation() {
        return this.hdfsResources.get().getUserGroupInformation();
    }

    protected String getEncoding() {
        return StringUtils.isNotBlank((String)this.connectorConfig.getEncoding()) ? this.connectorConfig.getEncoding() : Charset.defaultCharset().name();
    }

    protected CompressionCodec getCompressionCodec() {
        if (this.connectorConfig.getCompression() == null) {
            return null;
        }
        CompressionCodec codec = this.getCompressionCodecFactory().getCodecByName(this.connectorConfig.getCompression().name());
        return codec != null ? codec : new DefaultCodec();
    }

    protected CompressionCodecFactory getCompressionCodecFactory() {
        if (this.compressionCodecFactory == null) {
            this.compressionCodecFactory = new CompressionCodecFactory(this.getConfiguration());
        }
        return this.compressionCodecFactory;
    }

    static class ExtendedConfiguration
    extends Configuration {
        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap();

        ExtendedConfiguration() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Class<?> getClassByNameOrNull(String name) {
            Map<String, WeakReference<Class<Object>>> map;
            ClassLoader classLoader = this.getClassLoader();
            Map<ClassLoader, Map<String, WeakReference<Class<?>>>> map2 = this.cacheClasses;
            synchronized (map2) {
                map = this.cacheClasses.get(classLoader);
                if (map == null) {
                    map = Collections.synchronizedMap(new WeakHashMap());
                    this.cacheClasses.put(classLoader, map);
                }
            }
            Class<?> clazz = null;
            WeakReference<Class<Object>> ref = map.get(name);
            if (ref != null) {
                clazz = (Class<?>)ref.get();
            }
            if (clazz == null) {
                try {
                    clazz = Class.forName(name, true, classLoader);
                }
                catch (ClassNotFoundException | NoClassDefFoundError e) {
                    return null;
                }
                map.put(name, new WeakReference(clazz));
                return clazz;
            }
            return clazz;
        }
    }
}

