package org.apache.rocketmq.streams.common.channel.impl.file;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.utils.DateUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/impl/file/FileSource.class */
public class FileSource extends AbstractBatchSource {

    @ENVDependence
    private String filePath;
    protected transient BufferedReader reader;
    protected transient ExecutorService executorService;

    /* loaded from: input_file:org/apache/rocketmq/streams/common/channel/impl/file/FileSource$FileIterator.class */
    public static class FileIterator implements Iterator<String> {
        protected File file;
        private String line;
        protected int index = 0;
        protected BufferedReader reader;

        public FileIterator(File file) throws FileNotFoundException {
            this.reader = null;
            this.file = file;
            this.reader = new BufferedReader(new FileReader(file));
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            try {
                this.line = this.reader.readLine();
                this.index++;
                return this.line != null;
            } catch (IOException e) {
                throw new RuntimeException("read error ", e);
            }
        }

        public int getIndex() {
            return this.index;
        }

        public void close() {
            try {
                this.reader.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public String next() {
            return this.line;
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/streams/common/channel/impl/file/FileSource$ReadTask.class */
    protected class ReadTask implements Runnable {
        protected FileIterator fileIterator;
        protected AtomicInteger count;
        private CountDownLatch countDownLatch;

        public ReadTask(FileIterator fileIterator, AtomicInteger atomicInteger, CountDownLatch countDownLatch) {
            this.fileIterator = fileIterator;
            this.count = atomicInteger;
            this.countDownLatch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.fileIterator != null) {
                int i = 1;
                while (this.fileIterator.hasNext()) {
                    FileSource.this.doReceiveMessage(this.fileIterator.next(), false, this.fileIterator.file.getName(), i + "");
                    i++;
                    this.count.incrementAndGet();
                }
                FileSource.this.sendCheckpoint(this.fileIterator.file.getName());
                FileSource.this.executeMessage((Message) BatchFinishMessage.create());
                this.fileIterator.close();
                this.countDownLatch.countDown();
            }
        }
    }

    public FileSource(String str) {
        this();
        this.filePath = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource, org.apache.rocketmq.streams.common.channel.source.AbstractSource, org.apache.rocketmq.streams.common.configurable.AbstractConfigurable
    public boolean initConfigurable() {
        super.initConfigurable();
        File file = getFile(this.filePath);
        if (!file.exists() || !file.isDirectory()) {
            return true;
        }
        this.executorService = new ThreadPoolExecutor(this.maxThread, this.maxThread, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1000));
        return true;
    }

    private File getFile(String str) {
        URL resource;
        File file = new File(str);
        if (!file.exists() && (resource = getClass().getClassLoader().getResource(str)) != null) {
            String file2 = resource.getFile();
            file = new File(file2);
            this.filePath = file2;
        }
        return file;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource, org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean startSource() {
        LinkedBlockingQueue<FileIterator> createIteratorList = createIteratorList();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(createIteratorList.size());
        try {
            for (FileIterator poll = createIteratorList.poll(); poll != null; poll = createIteratorList.poll()) {
                ReadTask readTask = new ReadTask(poll, atomicInteger, countDownLatch);
                if (this.executorService != null) {
                    this.executorService.execute(readTask);
                } else {
                    readTask.run();
                }
            }
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("process data cost :" + (System.currentTimeMillis() - currentTimeMillis) + ", the count is " + atomicInteger.get() + " now " + DateUtil.getCurrentTimeString());
        return true;
    }

    protected LinkedBlockingQueue<FileIterator> createIteratorList() {
        LinkedBlockingQueue<FileIterator> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
        File file = getFile(this.filePath);
        if (!file.exists()) {
            throw new RuntimeException("filePath not exist.the filePath is " + this.filePath);
        }
        try {
            if (!file.isDirectory()) {
                linkedBlockingQueue.put(new FileIterator(file));
                return linkedBlockingQueue;
            }
            for (File file2 : file.listFiles()) {
                linkedBlockingQueue.add(new FileIterator(file2));
            }
            return linkedBlockingQueue;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.rocketmq.streams.common.configurable.AbstractConfigurable, org.apache.rocketmq.streams.common.configurable.IConfigurable
    public void destroy() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
        } catch (IOException e) {
            throw new RuntimeException("close error " + this.filePath, e);
        }
    }

    public FileSource() {
        setType(ISource.TYPE);
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setFilePath(String str) {
        this.filePath = str;
    }
}
