package org.apache.streams.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/hdfs/WebHdfsPersistReader.class */
public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable {
    public static final String STREAMS_ID = "WebHdfsPersistReader";
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class);
    protected static final char DELIMITER = '\t';
    protected FileSystem client;
    protected Path path;
    protected FileStatus[] status;
    protected volatile Queue<StreamsDatum> persistQueue;
    protected HdfsReaderConfiguration hdfsConfiguration;
    private ExecutorService executor;
    private Future<?> task;
    protected ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();

    public WebHdfsPersistReader(HdfsReaderConfiguration hdfsReaderConfiguration) {
        this.hdfsConfiguration = hdfsReaderConfiguration;
    }

    public URI getURI() throws URISyntaxException {
        StringBuilder sb = new StringBuilder();
        sb.append(this.hdfsConfiguration.getScheme());
        sb.append("://");
        if (Strings.isNullOrEmpty(this.hdfsConfiguration.getHost())) {
            sb.append("/");
        } else {
            sb.append(this.hdfsConfiguration.getHost() + ":" + this.hdfsConfiguration.getPort());
        }
        return new URI(sb.toString());
    }

    public boolean isConnected() {
        return this.client != null;
    }

    public final synchronized FileSystem getFileSystem() {
        if (!isConnected()) {
            connectToWebHDFS();
        }
        return this.client;
    }

    private synchronized void connectToWebHDFS() {
        try {
            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
            UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
            createRemoteUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
            createRemoteUser.doAs(new PrivilegedExceptionAction<Void>() { // from class: org.apache.streams.hdfs.WebHdfsPersistReader.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public Void run() throws Exception {
                    Configuration configuration = new Configuration();
                    configuration.set("hadoop.security.authentication", "kerberos");
                    WebHdfsPersistReader.LOGGER.info("WebURI : {}", WebHdfsPersistReader.this.getURI().toString());
                    WebHdfsPersistReader.this.client = FileSystem.get(WebHdfsPersistReader.this.getURI(), configuration);
                    WebHdfsPersistReader.LOGGER.info("Connected to WebHDFS");
                    return null;
                }
            });
        } catch (Exception e) {
            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
            e.printStackTrace();
        }
    }

    public void prepare(Object obj) {
        LOGGER.debug("Prepare");
        connectToWebHDFS();
        this.path = new Path(this.hdfsConfiguration.getPath() + "/" + this.hdfsConfiguration.getReaderPath());
        try {
            if (this.client.isFile(this.path)) {
                FileStatus fileStatus = this.client.getFileStatus(this.path);
                this.status = new FileStatus[1];
                this.status[0] = fileStatus;
            } else if (this.client.isDirectory(this.path)) {
                this.status = this.client.listStatus(this.path);
            } else {
                LOGGER.error("Neither file nor directory, wtf");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(10000));
        this.executor = Executors.newSingleThreadExecutor();
    }

    public void cleanUp() {
    }

    public StreamsResultSet readAll() {
        Thread thread = new Thread(new WebHdfsPersistReaderTask(this));
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
        }
        return new StreamsResultSet(this.persistQueue);
    }

    public void startStream() {
        LOGGER.debug("startStream");
        this.task = this.executor.submit(new WebHdfsPersistReaderTask(this));
    }

    public StreamsResultSet readCurrent() {
        StreamsResultSet streamsResultSet;
        synchronized (WebHdfsPersistReader.class) {
            streamsResultSet = new StreamsResultSet(Queues.newConcurrentLinkedQueue(this.persistQueue));
            streamsResultSet.setCounter(new DatumStatusCounter());
            streamsResultSet.getCounter().add(this.countersCurrent);
            this.countersTotal.add(this.countersCurrent);
            this.countersCurrent = new DatumStatusCounter();
            this.persistQueue.clear();
        }
        return streamsResultSet;
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return (this.task.isDone() || this.task.isCancelled()) ? false : true;
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }
}
