/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteConfiguration;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.hdfs.HdfsReaderConfiguration;
import org.apache.streams.hdfs.WebHdfsPersistReaderTask;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebHdfsPersistReader
implements StreamsPersistReader,
DatumStatusCountable {
    public static final String STREAMS_ID = "WebHdfsPersistReader";
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class);
    protected static final char DELIMITER = '\t';
    protected FileSystem client;
    protected Path path;
    protected FileStatus[] status;
    protected volatile Queue<StreamsDatum> persistQueue;
    protected ObjectMapper mapper;
    protected LineReadWriteUtil lineReaderUtil;
    protected HdfsReaderConfiguration hdfsConfiguration;
    protected StreamsConfiguration streamsConfiguration;
    private ExecutorService executor;
    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
    private Future<?> task;

    public WebHdfsPersistReader() {
        this((HdfsReaderConfiguration)new ComponentConfigurator(HdfsReaderConfiguration.class).detectConfiguration());
    }

    public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
        this.hdfsConfiguration = hdfsConfiguration;
    }

    public URI getURI() throws URISyntaxException {
        StringBuilder uriBuilder = new StringBuilder();
        uriBuilder.append((Object)this.hdfsConfiguration.getScheme());
        uriBuilder.append("://");
        if (StringUtils.isNotBlank((CharSequence)this.hdfsConfiguration.getHost())) {
            uriBuilder.append(this.hdfsConfiguration.getHost());
            if (this.hdfsConfiguration.getPort() != null) {
                uriBuilder.append(":" + this.hdfsConfiguration.getPort());
            }
        } else {
            uriBuilder.append("/");
        }
        return new URI(uriBuilder.toString());
    }

    public boolean isConnected() {
        return this.client != null;
    }

    public final synchronized FileSystem getFileSystem() {
        if (!this.isConnected()) {
            this.connectToWebHDFS();
        }
        return this.client;
    }

    private synchronized void connectToWebHDFS() {
        try {
            LOGGER.info("User : {}", (Object)this.hdfsConfiguration.getUser());
            UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)this.hdfsConfiguration.getUser());
            ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
            ugi.doAs(() -> {
                Configuration conf = new Configuration();
                conf.set("hadoop.security.authentication", "kerberos");
                conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
                conf.set("fs.file.impl", LocalFileSystem.class.getName());
                LOGGER.info("WebURI : {}", (Object)this.getURI().toString());
                this.client = FileSystem.get((URI)this.getURI(), (Configuration)conf);
                LOGGER.info("Connected to WebHDFS");
                return null;
            });
        }
        catch (Exception ex) {
            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
            ex.printStackTrace();
        }
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void prepare(Object configurationObject) {
        LOGGER.debug("Prepare");
        this.lineReaderUtil = LineReadWriteUtil.getInstance((LineReadWriteConfiguration)this.hdfsConfiguration);
        this.connectToWebHDFS();
        String pathString = this.hdfsConfiguration.getPath() + "/" + this.hdfsConfiguration.getReaderPath();
        LOGGER.info("Path : {}", (Object)pathString);
        this.path = new Path(pathString);
        try {
            if (this.client.isFile(this.path)) {
                LOGGER.info("Found File");
                FileStatus fileStatus = this.client.getFileStatus(this.path);
                this.status = new FileStatus[1];
                this.status[0] = fileStatus;
            } else if (this.client.isDirectory(this.path)) {
                this.status = this.client.listStatus(this.path);
                List<FileStatus> statusList = Arrays.asList(this.status);
                Collections.sort(statusList);
                this.status = statusList.toArray(new FileStatus[0]);
                LOGGER.info("Found Directory : {} files", (Object)this.status.length);
            } else {
                LOGGER.error("Neither file nor directory, wtf");
            }
        }
        catch (IOException ex) {
            LOGGER.error("IOException", (Throwable)ex);
        }
        this.streamsConfiguration = StreamsConfigurator.detectConfiguration();
        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(this.streamsConfiguration.getBatchSize().intValue()));
        this.executor = Executors.newSingleThreadExecutor();
        this.mapper = StreamsJacksonMapper.getInstance();
    }

    public void cleanUp() {
    }

    public StreamsResultSet readAll() {
        WebHdfsPersistReaderTask readerTask = new WebHdfsPersistReaderTask(this);
        Thread readerThread = new Thread(readerTask);
        readerThread.start();
        try {
            readerThread.join();
        }
        catch (InterruptedException ignored) {
            LOGGER.trace("ignored InterruptedException", (Throwable)ignored);
        }
        return new StreamsResultSet(this.persistQueue);
    }

    public void startStream() {
        LOGGER.debug("startStream");
        this.task = this.executor.submit(new WebHdfsPersistReaderTask(this));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamsResultSet readCurrent() {
        Class<WebHdfsPersistReader> clazz = WebHdfsPersistReader.class;
        synchronized (WebHdfsPersistReader.class) {
            StreamsResultSet current = new StreamsResultSet(new ConcurrentLinkedQueue<StreamsDatum>(this.persistQueue));
            current.setCounter(new DatumStatusCounter());
            current.getCounter().add(this.countersCurrent);
            this.countersTotal.add(this.countersCurrent);
            this.countersCurrent = new DatumStatusCounter();
            this.persistQueue.clear();
            // ** MonitorExit[var2_1] (shouldn't be in output)
            return current;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    protected void write(StreamsDatum entry) {
        boolean success;
        do {
            Class<WebHdfsPersistReader> clazz = WebHdfsPersistReader.class;
            // MONITORENTER : org.apache.streams.hdfs.WebHdfsPersistReader.class
            success = this.persistQueue.offer(entry);
            // MONITOREXIT : clazz
            Thread.yield();
        } while (!success);
    }

    public StreamsResultSet readNew(BigInteger sequence) {
        return null;
    }

    public StreamsResultSet readRange(DateTime start, DateTime end) {
        return null;
    }

    public boolean isRunning() {
        return this.task == null || !this.task.isDone() && !this.task.isCancelled();
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }
}

