package org.apache.ratis.examples.filestore.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/ratis/examples/filestore/cli/DataStream.class
 */
@Parameters(commandDescription = "Load Generator for FileStore DataStream")
/* loaded from: input_file:ratis-examples-3.0.1.jar:org/apache/ratis/examples/filestore/cli/DataStream.class */
public class DataStream extends Client {
    private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]";

    @Parameter(names = {"--type"}, description = DESCRIPTION, required = true)
    private String dataStreamType;

    @Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero,-1 means on sync", required = true)
    private int syncSize;

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/examples/filestore/cli/DataStream$DirectByteBufferType.class
     */
    /* loaded from: input_file:ratis-examples-3.0.1.jar:org/apache/ratis/examples/filestore/cli/DataStream$DirectByteBufferType.class */
    static class DirectByteBufferType extends TransferType {
        DirectByteBufferType(String str, DataStream dataStream) {
            super(str, dataStream);
        }

        @Override // org.apache.ratis.examples.filestore.cli.DataStream.TransferType
        long write(FileChannel fileChannel, DataStreamOutput dataStreamOutput, long j, List<CompletableFuture<DataStreamReply>> list) throws IOException {
            int bufferSize = getBufferSize();
            ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
            int writeBytes = directBuffer.writeBytes(fileChannel, bufferSize);
            if (writeBytes < 0) {
                throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this + ". The channel has reached end-of-stream at " + j);
            }
            if (writeBytes > 0) {
                CompletableFuture<DataStreamReply> writeAsync = isSync(j + ((long) writeBytes)) ? dataStreamOutput.writeAsync(directBuffer.nioBuffer(), new WriteOption[]{StandardWriteOption.SYNC}) : dataStreamOutput.writeAsync(directBuffer.nioBuffer(), new WriteOption[0]);
                directBuffer.getClass();
                writeAsync.thenRun(directBuffer::release);
                list.add(writeAsync);
            }
            return writeBytes;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/examples/filestore/cli/DataStream$MappedByteBufferType.class
     */
    /* loaded from: input_file:ratis-examples-3.0.1.jar:org/apache/ratis/examples/filestore/cli/DataStream$MappedByteBufferType.class */
    static class MappedByteBufferType extends TransferType {
        MappedByteBufferType(String str, DataStream dataStream) {
            super(str, dataStream);
        }

        @Override // org.apache.ratis.examples.filestore.cli.DataStream.TransferType
        long write(FileChannel fileChannel, DataStreamOutput dataStreamOutput, long j, List<CompletableFuture<DataStreamReply>> list) throws IOException {
            MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_ONLY, j, getPacketSize(j));
            int remaining = map.remaining();
            list.add(isSync(j + ((long) remaining)) ? dataStreamOutput.writeAsync(map, new WriteOption[]{StandardWriteOption.SYNC}) : dataStreamOutput.writeAsync(map, new WriteOption[0]));
            return remaining;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/examples/filestore/cli/DataStream$NettyFileRegionType.class
     */
    /* loaded from: input_file:ratis-examples-3.0.1.jar:org/apache/ratis/examples/filestore/cli/DataStream$NettyFileRegionType.class */
    static class NettyFileRegionType extends TransferType {
        NettyFileRegionType(String str, DataStream dataStream) {
            super(str, dataStream);
        }

        @Override // org.apache.ratis.examples.filestore.cli.DataStream.TransferType
        long write(FileChannel fileChannel, DataStreamOutput dataStreamOutput, long j, List<CompletableFuture<DataStreamReply>> list) {
            long packetSize = getPacketSize(j);
            list.add(isSync(j + packetSize) ? dataStreamOutput.writeAsync(getFile(), j, packetSize, new WriteOption[]{StandardWriteOption.SYNC}) : dataStreamOutput.writeAsync(getFile(), j, packetSize, new WriteOption[0]));
            return packetSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/examples/filestore/cli/DataStream$TransferType.class
     */
    /* loaded from: input_file:ratis-examples-3.0.1.jar:org/apache/ratis/examples/filestore/cli/DataStream$TransferType.class */
    public static abstract class TransferType {
        private final String path;
        private final File file;
        private final long fileSize;
        private final int bufferSize;
        private final long syncSize;
        private long syncPosition = 0;

        TransferType(String str, DataStream dataStream) {
            this.path = str;
            this.file = new File(str);
            this.fileSize = dataStream.getFileSizeInBytes();
            this.bufferSize = dataStream.getBufferSizeInBytes();
            this.syncSize = dataStream.getSyncSize();
            long length = this.file.length();
            Preconditions.assertTrue(length == this.fileSize, () -> {
                return "Unexpected file size: expected size is " + this.fileSize + " but actual size is " + length + ", path=" + str;
            });
        }

        File getFile() {
            return this.file;
        }

        int getBufferSize() {
            return this.bufferSize;
        }

        long getPacketSize(long j) {
            return Math.min(this.bufferSize, this.fileSize - j);
        }

        boolean isSync(long j) {
            if (this.syncSize <= 0) {
                return false;
            }
            if (j < this.fileSize && j - this.syncPosition < this.syncSize) {
                return false;
            }
            this.syncPosition = j;
            return true;
        }

        List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient fileStoreClient, RoutingTable routingTable) throws IOException {
            if (this.fileSize <= 0) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            DataStreamOutput streamOutput = fileStoreClient.getStreamOutput(this.file.getName(), this.fileSize, routingTable);
            try {
                try {
                    FileChannel newFileChannel = FileUtils.newFileChannel(this.file, new OpenOption[]{StandardOpenOption.READ});
                    Throwable th = null;
                    long j = 0;
                    while (j < this.fileSize) {
                        try {
                            try {
                                j += write(newFileChannel, streamOutput, j, arrayList);
                            } finally {
                            }
                        } catch (Throwable th2) {
                            if (newFileChannel != null) {
                                if (th != null) {
                                    try {
                                        newFileChannel.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    newFileChannel.close();
                                }
                            }
                            throw th2;
                        }
                    }
                    if (newFileChannel != null) {
                        if (0 != 0) {
                            try {
                                newFileChannel.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            newFileChannel.close();
                        }
                    }
                    return arrayList;
                } finally {
                    arrayList.add(streamOutput.closeAsync());
                }
            } catch (Throwable th5) {
                throw new IOException("Failed to transfer " + this.path);
            }
        }

        abstract long write(FileChannel fileChannel, DataStreamOutput dataStreamOutput, long j, List<CompletableFuture<DataStreamReply>> list) throws IOException;

        public String toString() {
            return JavaUtils.getClassSimpleName(getClass()) + "{" + this.path + ", size=" + this.fileSize + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/examples/filestore/cli/DataStream$Type.class
     */
    /* loaded from: input_file:ratis-examples-3.0.1.jar:org/apache/ratis/examples/filestore/cli/DataStream$Type.class */
    public enum Type {
        DirectByteBuffer(DirectByteBufferType::new),
        MappedByteBuffer(MappedByteBufferType::new),
        NettyFileRegion(NettyFileRegionType::new);

        private final BiFunction<String, DataStream, TransferType> constructor;

        Type(BiFunction biFunction) {
            this.constructor = biFunction;
        }

        BiFunction<String, DataStream, TransferType> getConstructor() {
            return this.constructor;
        }

        static Type valueOfIgnoreCase(String str) {
            for (Type type : values()) {
                if (type.name().equalsIgnoreCase(str)) {
                    return type;
                }
            }
            return null;
        }
    }

    public DataStream() {
        String obj = Arrays.asList(Type.values()).toString();
        Preconditions.assertTrue(obj.equals(DESCRIPTION), () -> {
            return "Unexpected description: [DirectByteBuffer, MappedByteBuffer, NettyFileRegion] does not equal to the expected string " + obj;
        });
        this.dataStreamType = Type.NettyFileRegion.name();
        this.syncSize = -1;
    }

    int getSyncSize() {
        return this.syncSize;
    }

    private boolean checkParam() {
        if (this.syncSize != -1 && this.syncSize % getBufferSizeInBytes() != 0) {
            System.err.println("Error: syncSize % bufferSize should be zero");
            return false;
        }
        if (Type.valueOfIgnoreCase(this.dataStreamType) != null) {
            return true;
        }
        System.err.println("Error: dataStreamType should be one of [DirectByteBuffer, MappedByteBuffer, NettyFileRegion]");
        return false;
    }

    @Override // org.apache.ratis.examples.filestore.cli.Client
    protected void operation(List<FileStoreClient> list) throws IOException, ExecutionException, InterruptedException {
        if (!checkParam()) {
            stop(list);
        }
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(getNumThread());
        List<String> generateFiles = generateFiles(newFixedThreadPool);
        dropCache();
        System.out.println("Starting DataStream write now ");
        RoutingTable routingTable = getRoutingTable(Arrays.asList(getPeers()), getPrimary());
        long currentTimeMillis = System.currentTimeMillis();
        long waitStreamFinish = waitStreamFinish(streamWrite(generateFiles, list, routingTable, newFixedThreadPool));
        long currentTimeMillis2 = System.currentTimeMillis();
        System.out.println("Total files written: " + getNumFiles());
        System.out.println("Each files size: " + getFileSizeInBytes());
        System.out.println("Total data written: " + waitStreamFinish + " bytes");
        System.out.println("Total time taken: " + (currentTimeMillis2 - currentTimeMillis) + " millis");
        stop(list);
    }

    private Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(List<String> list, List<FileStoreClient> list2, RoutingTable routingTable, ExecutorService executorService) {
        HashMap hashMap = new HashMap();
        int i = 0;
        for (String str : list) {
            CompletableFuture completableFuture = new CompletableFuture();
            FileStoreClient fileStoreClient = list2.get(i % list2.size());
            i++;
            CompletableFuture.supplyAsync(() -> {
                long length = new File(str).length();
                Preconditions.assertTrue(length == getFileSizeInBytes(), "Unexpected file size: expected size is " + getFileSizeInBytes() + " but actual size is " + length);
                try {
                    completableFuture.complete(((Type) Optional.ofNullable(Type.valueOfIgnoreCase(this.dataStreamType)).orElseThrow(IllegalStateException::new)).getConstructor().apply(str, this).transfer(fileStoreClient, routingTable));
                } catch (IOException e) {
                    completableFuture.completeExceptionally(e);
                }
                return completableFuture;
            }, executorService);
            hashMap.put(str, completableFuture);
        }
        return hashMap;
    }

    private long waitStreamFinish(Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> map) throws ExecutionException, InterruptedException {
        long j = 0;
        Iterator<CompletableFuture<List<CompletableFuture<DataStreamReply>>>> it = map.values().iterator();
        while (it.hasNext()) {
            long j2 = 0;
            Iterator<CompletableFuture<DataStreamReply>> it2 = it.next().get().iterator();
            while (it2.hasNext()) {
                j2 += it2.next().join().getBytesWritten();
            }
            if (j2 != getFileSizeInBytes()) {
                System.out.println("File written:" + j2 + " does not match expected:" + getFileSizeInBytes());
            }
            j += j2;
        }
        return j;
    }
}
