package org.apache.apex.malhar.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nonnull;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/flume/source/HdfsTestSource.class */
public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable {
    public static final String SOURCE_DIR = "sourceDir";
    public static final String RATE = "rate";
    public static final String INIT_DATE = "initDate";
    public Timer emitTimer;

    @Nonnull
    String directory;
    Path directoryPath;
    String initDate;
    long initTime;
    long oneDayBack;
    protected transient FileSystem fs;
    private transient Configuration configuration;
    private transient boolean finished;
    private List<Event> events;
    static byte FIELD_SEPARATOR = 2;
    private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);
    private transient BufferedReader br = null;
    private transient int currentFile = 0;
    int rate = 2500;
    List<String> dataFiles = Lists.newArrayList();

    public HdfsTestSource() {
        Calendar calendar = Calendar.getInstance();
        calendar.add(5, -1);
        this.oneDayBack = calendar.getTimeInMillis();
        this.configuration = new Configuration();
        this.events = Lists.newArrayList();
    }

    public void configure(Context context) {
        this.directory = context.getString(SOURCE_DIR);
        this.rate = context.getInteger("rate", Integer.valueOf(this.rate)).intValue();
        this.initDate = context.getString(INIT_DATE);
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.directory));
        this.directoryPath = new Path(this.directory);
        String[] split = this.initDate.split("-");
        Preconditions.checkArgument(split.length == 3);
        Calendar calendar = Calendar.getInstance();
        calendar.set(Integer.parseInt(split[0]), Integer.parseInt(split[1]) - 1, Integer.parseInt(split[2]), 0, 0, 0);
        this.initTime = calendar.getTimeInMillis();
        try {
            Iterator<String> it = findFiles().iterator();
            while (it.hasNext()) {
                this.dataFiles.add(it.next());
            }
            if (logger.isDebugEnabled()) {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                logger.debug("settings {} {} {} {} {}", new Object[]{this.directory, Integer.valueOf(this.rate), simpleDateFormat.format(Long.valueOf(this.oneDayBack)), simpleDateFormat.format(new Date(this.initTime)), Integer.valueOf(this.currentFile)});
                Iterator<String> it2 = this.dataFiles.iterator();
                while (it2.hasNext()) {
                    logger.debug("settings add file {}", it2.next());
                }
            }
            this.fs = FileSystem.newInstance(new Path(this.directory).toUri(), this.configuration);
            this.br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(this.fs.open(new Path(this.dataFiles.get(this.currentFile))))));
            this.finished = true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private List<String> findFiles() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Path path = new Path(this.directory);
        FileSystem newInstance = FileSystem.newInstance(path.toUri(), this.configuration);
        try {
            try {
                logger.debug("checking for new files in {}", path);
                RemoteIterator listFiles = newInstance.listFiles(path, true);
                while (listFiles.hasNext()) {
                    Path path2 = ((FileStatus) listFiles.next()).getPath();
                    String path3 = path2.toString();
                    if (path3.endsWith(".gz")) {
                        logger.debug("new file {}", path3);
                        newArrayList.add(path2.toString());
                    }
                }
                return newArrayList;
            } catch (FileNotFoundException e) {
                logger.warn("Failed to list directory {}", path, e);
                throw new RuntimeException(e);
            }
        } finally {
            newInstance.close();
        }
    }

    public void start() {
        super.start();
        this.emitTimer = new Timer();
        final ChannelProcessor channelProcessor = getChannelProcessor();
        this.emitTimer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.apex.malhar.flume.source.HdfsTestSource.1
            /* JADX WARN: Code restructure failed: missing block: B:19:0x006e, code lost:
            
                org.apache.apex.malhar.flume.source.HdfsTestSource.logger.info("finished all files");
                r10.this$0.finished = true;
             */
            @Override // java.util.TimerTask, java.lang.Runnable
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void run() {
                /*
                    Method dump skipped, instructions count: 317
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.apex.malhar.flume.source.HdfsTestSource.AnonymousClass1.run():void");
            }
        }, 0L, 1000L);
    }

    public void stop() {
        this.emitTimer.cancel();
        super.stop();
    }

    static /* synthetic */ int access$308(HdfsTestSource hdfsTestSource) {
        int i = hdfsTestSource.currentFile;
        hdfsTestSource.currentFile = i + 1;
        return i;
    }
}
