/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;

public class BinaryReadStrategy
extends AbstractReadStrategy {
    public static SeaTunnelRowType binaryRowType = new SeaTunnelRowType(new String[]{"data", "relativePath", "partIndex"}, new SeaTunnelDataType[]{PrimitiveByteArrayType.INSTANCE, BasicType.STRING_TYPE, BasicType.LONG_TYPE});
    private File basePath;
    private int binaryChunkSize = (Integer)FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
    private boolean completeFileMode = (Boolean)FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();

    @Override
    public void init(HadoopConf conf) {
        super.init(conf);
        this.basePath = new File(this.pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
            this.binaryChunkSize = this.pluginConfig.getInt(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key());
            if (this.binaryChunkSize <= 0) {
                throw new IllegalArgumentException("Binary chunk size must be positive, got: " + this.binaryChunkSize);
            }
            if (this.binaryChunkSize > 0x6400000) {
                throw new IllegalArgumentException("Binary chunk size too large (max 100MB), got: " + this.binaryChunkSize);
            }
        }
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.key())) {
            this.completeFileMode = this.pluginConfig.getBoolean(FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.key());
        }
    }

    @Override
    public void read(String path, String tableId, Collector<SeaTunnelRow> output) throws IOException, FileConnectorException {
        try (FSDataInputStream inputStream = this.hadoopFileSystemProxy.getInputStream(path);){
            String relativePath;
            if (this.hadoopFileSystemProxy.isFile(this.basePath.getAbsolutePath())) {
                relativePath = this.basePath.getName();
            } else {
                relativePath = path.substring(path.indexOf(this.basePath.getAbsolutePath()) + this.basePath.getAbsolutePath().length());
                if (relativePath.startsWith(File.separator)) {
                    relativePath = relativePath.substring(File.separator.length());
                }
            }
            if (this.completeFileMode) {
                this.readCompleteFile((InputStream)inputStream, relativePath, tableId, output);
            } else {
                this.readFileInChunks((InputStream)inputStream, relativePath, tableId, output);
            }
        }
    }

    private void readCompleteFile(InputStream inputStream, String relativePath, String tableId, Collector<SeaTunnelRow> output) throws IOException {
        byte[] fileContent = IOUtils.toByteArray((InputStream)inputStream);
        SeaTunnelRow row = new SeaTunnelRow(new Object[]{fileContent, relativePath, 0L});
        row.setTableId(tableId);
        output.collect((Object)row);
    }

    private void readFileInChunks(InputStream inputStream, String relativePath, String tableId, Collector<SeaTunnelRow> output) throws IOException {
        int readSize;
        byte[] buffer = new byte[this.binaryChunkSize];
        long partIndex = 0L;
        while ((readSize = inputStream.read(buffer)) != -1) {
            if (readSize != this.binaryChunkSize) {
                buffer = Arrays.copyOf(buffer, readSize);
            }
            SeaTunnelRow row = new SeaTunnelRow(new Object[]{buffer, relativePath, partIndex});
            buffer = new byte[this.binaryChunkSize];
            row.setTableId(tableId);
            output.collect((Object)row);
            ++partIndex;
        }
    }

    @Override
    public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException {
        return binaryRowType;
    }
}

