package org.apache.streams.hdfs;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/hdfs/WebHdfsPersistReaderTask.class */
public class WebHdfsPersistReaderTask implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReaderTask.class);
    private WebHdfsPersistReader reader;

    public WebHdfsPersistReaderTask(WebHdfsPersistReader webHdfsPersistReader) {
        this.reader = webHdfsPersistReader;
    }

    @Override // java.lang.Runnable
    public void run() {
        for (FileStatus fileStatus : this.reader.status) {
            LOGGER.info("Found " + fileStatus.getPath().getName());
            if (fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
                LOGGER.info("Started Processing " + fileStatus.getPath().getName());
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.reader.client.open(fileStatus.getPath())));
                    String str = "";
                    do {
                        try {
                            str = bufferedReader.readLine();
                            if (!Strings.isNullOrEmpty(str)) {
                                this.reader.countersCurrent.incrementAttempt();
                                StreamsDatum processLine = processLine(str);
                                if (processLine != null) {
                                    write(processLine);
                                    this.reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                                } else {
                                    LOGGER.warn("processLine failed");
                                    this.reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
                                }
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            LOGGER.warn(e.getMessage());
                            this.reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
                        }
                    } while (!Strings.isNullOrEmpty(str));
                    LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
                    try {
                        bufferedReader.close();
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        LOGGER.error(e2.getMessage());
                    }
                } catch (Exception e3) {
                    e3.printStackTrace();
                    LOGGER.error(e3.getMessage());
                    return;
                }
            }
        }
        Uninterruptibles.sleepUninterruptibly(15L, TimeUnit.SECONDS);
    }

    private void write(StreamsDatum streamsDatum) {
        boolean offer;
        do {
            synchronized (WebHdfsPersistReader.class) {
                offer = this.reader.persistQueue.offer(streamsDatum);
            }
            Thread.yield();
        } while (!offer);
    }

    private StreamsDatum processLine(String str) {
        String[] split = str.split(this.reader.hdfsConfiguration.getFieldDelimiter());
        if (split.length == 0) {
            return null;
        }
        String str2 = null;
        DateTime dateTime = null;
        Map<String, Object> map = null;
        String str3 = null;
        if (this.reader.hdfsConfiguration.getFields().contains("DOC") && split.length > this.reader.hdfsConfiguration.getFields().indexOf("DOC")) {
            str3 = split[this.reader.hdfsConfiguration.getFields().indexOf("DOC")];
        }
        if (this.reader.hdfsConfiguration.getFields().contains("ID") && split.length > this.reader.hdfsConfiguration.getFields().indexOf("ID")) {
            str2 = split[this.reader.hdfsConfiguration.getFields().indexOf("ID")];
        }
        if (this.reader.hdfsConfiguration.getFields().contains("TS") && split.length > this.reader.hdfsConfiguration.getFields().indexOf("TS")) {
            dateTime = parseTs(split[this.reader.hdfsConfiguration.getFields().indexOf("TS")]);
        }
        if (this.reader.hdfsConfiguration.getFields().contains("META") && split.length > this.reader.hdfsConfiguration.getFields().indexOf("META")) {
            map = parseMap(split[this.reader.hdfsConfiguration.getFields().indexOf("META")]);
        }
        StreamsDatum streamsDatum = new StreamsDatum(str3);
        streamsDatum.setId(str2);
        streamsDatum.setTimestamp(dateTime);
        streamsDatum.setMetadata(map);
        return streamsDatum;
    }

    private DateTime parseTs(String str) {
        DateTime dateTime = null;
        try {
            dateTime = new DateTime(Long.parseLong(str));
        } catch (Exception e) {
        }
        try {
            dateTime = (DateTime) this.reader.mapper.readValue(str, DateTime.class);
        } catch (Exception e2) {
        }
        return dateTime;
    }

    private Map<String, Object> parseMap(String str) {
        Map<String, Object> map = null;
        try {
            map = (Map) this.reader.mapper.convertValue((JsonNode) this.reader.mapper.readValue(str, JsonNode.class), Map.class);
        } catch (IOException e) {
            LOGGER.warn("failed in parseMap: " + e.getMessage());
        }
        return map;
    }
}
