package org.apache.hadoop.ozone.freon;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.concurrent.TimedSemaphore;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name = "falg", aliases = {"follower-append-log-generator"}, description = {"Generate append log entries to a follower server"}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true, showDefaultValues = true)
/* loaded from: input_file:org/apache/hadoop/ozone/freon/FollowerAppendLogEntryGenerator.class */
public class FollowerAppendLogEntryGenerator extends BaseAppendLogGenerator implements Callable<Void>, StreamObserver<RaftProtos.AppendEntriesReplyProto> {
    public static final String FAKE_LEADER_ADDDRESS = "localhost:1234";
    private static final Logger LOG = LoggerFactory.getLogger(FollowerAppendLogEntryGenerator.class);
    private static final String FAKE_LEADER_ID = "ffffffff-df33-4a20-8e1f-ffffffff6be5";

    @CommandLine.Option(names = {"-s", "--size"}, description = {"Size of the generated chunks (in bytes)"}, defaultValue = "1024")
    private int chunkSize;

    @CommandLine.Option(names = {"-b", "--batching"}, description = {"Number of write chunks requests in one AppendLogEntry"}, defaultValue = "2")
    private int batching;

    @CommandLine.Option(names = {"-i", "--next-index"}, description = {"The next index in the term 2 to continue a test. (If zero, a new ratis ring will be intialized with configureGroup call and vote)"}, defaultValue = "0")
    private long nextIndex;

    @CommandLine.Option(names = {"--rate-limit"}, description = {"Maximum number of requests per second (if bigger than 0)"}, defaultValue = "0")
    private int rateLimit;

    @CommandLine.Option(names = {"--inflight-limit"}, description = {"Maximum in-flight messages"}, defaultValue = "10")
    private int inflightLimit;
    private TimedSemaphore rateLimiter;
    private RaftProtos.RaftPeerProto requestor;
    private RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub stub;
    private ByteString dataToWrite;
    private Timer timer;
    private BlockingQueue<Long> inFlightMessages;
    private StreamObserver<RaftProtos.AppendEntriesRequestProto> sender;

    @CommandLine.Option(names = {"-l", "--pipeline"}, description = {"Pipeline to use. By default the first RATIS/THREE pipeline will be used."}, defaultValue = "96714307-4bd7-42b5-a65d-e1b13b4ca5c0")
    private String pipelineId = "96714307-4bd7-42b5-a65d-e1b13b4ca5c0";
    private long term = 2;
    private Random callIdRandom = new Random();

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Void call() throws Exception {
        this.inFlightMessages = new LinkedBlockingQueue(this.inflightLimit);
        this.timer = getMetrics().timer("append-entry");
        this.dataToWrite = ByteString.copyFrom(RandomStringUtils.randomAscii(this.chunkSize).getBytes(StandardCharsets.UTF_8));
        setServerIdFromFile(createOzoneConfiguration());
        Preconditions.assertTrue(getThreadNo() == 1, "This test should be executed from one thread");
        this.requestor = RaftProtos.RaftPeerProto.newBuilder().setId(RaftPeerId.valueOf(FAKE_LEADER_ID).toByteString()).setAddress(FAKE_LEADER_ADDDRESS).build();
        NettyChannelBuilder forTarget = NettyChannelBuilder.forTarget(this.serverAddress);
        forTarget.negotiationType(NegotiationType.PLAINTEXT);
        this.stub = RaftServerProtocolServiceGrpc.newStub(forTarget.build());
        if (this.rateLimit != 0) {
            this.rateLimiter = new TimedSemaphore(1L, TimeUnit.SECONDS, this.rateLimit);
        }
        init();
        this.sender = this.stub.appendEntries(this);
        if (this.nextIndex == 0) {
            configureGroup();
            RaftProtos.RequestVoteReplyProto requestVoteReplyProto = requestVote().get(1000L, TimeUnit.SECONDS);
            LOG.info("Datanode answered to the vote request: {}", requestVoteReplyProto);
            if (!requestVoteReplyProto.getServerReply().getSuccess()) {
                throw new RuntimeException("Datanode didn't vote to the fake freon leader.");
            }
            long nextLong = this.callIdRandom.nextLong();
            this.inFlightMessages.put(Long.valueOf(nextLong));
            this.sender.onNext(createInitialLogEntry(nextLong));
            this.nextIndex = 1L;
        }
        runTests(this::sendAppendLogEntryRequest);
        if (this.rateLimiter == null) {
            return null;
        }
        this.rateLimiter.shutdown();
        return null;
    }

    private void sendAppendLogEntryRequest(long j) {
        this.timer.time(() -> {
            try {
                long nextLong = this.callIdRandom.nextLong();
                this.inFlightMessages.put(Long.valueOf(nextLong));
                this.sender.onNext(createAppendLogEntry(j, nextLong));
            } catch (Exception e) {
                LOG.error("Error while sending new append entry request (HB) to the follower", e);
            }
        });
    }

    /*  JADX ERROR: Failed to decode insn: 0x003D: MOVE_MULTI, method: org.apache.hadoop.ozone.freon.FollowerAppendLogEntryGenerator.createAppendLogEntry(long, long):org.apache.ratis.proto.RaftProtos$AppendEntriesRequestProto
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[9]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto createAppendLogEntry(long r10, long r12) {
        /*
            Method dump skipped, instructions count: 399
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.ozone.freon.FollowerAppendLogEntryGenerator.createAppendLogEntry(long, long):org.apache.ratis.proto.RaftProtos$AppendEntriesRequestProto");
    }

    private void configureGroup() throws IOException {
        ClientId randomId = ClientId.randomId();
        RaftGroupId valueOf = RaftGroupId.valueOf(UUID.fromString(this.pipelineId));
        RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId(this.serverId);
        RaftGroup valueOf2 = RaftGroup.valueOf(valueOf, new RaftPeer[]{new RaftPeer(RaftPeerId.valueOf(this.serverId), this.serverAddress), new RaftPeer(RaftPeerId.valueOf(FAKE_LEADER_ID), FAKE_LEADER_ADDDRESS)});
        LOG.info("Group is configured in the RAFT server (one follower, one fake leader): {}", RaftClient.newBuilder().setClientId(randomId).setProperties(new RaftProperties(true)).setRaftGroup(valueOf2).build().groupAdd(valueOf2, raftPeerId));
    }

    public void onNext(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
        long callId = appendEntriesReplyProto.getServerReply().getCallId();
        if (!this.inFlightMessages.remove(Long.valueOf(callId))) {
            LOG.warn("Received message with callId which was not used to send message: {}", Long.valueOf(callId));
            LOG.info("{}", appendEntriesReplyProto);
        }
        long followerCommit = appendEntriesReplyProto.getFollowerCommit();
        if (followerCommit % 1000 == 0) {
            long j = getAttemptCounter().get();
            if (j - followerCommit > this.batching * 3) {
                LOG.warn("Last committed index ({}) is behind the current index ({}) on the client side.", Long.valueOf(followerCommit), Long.valueOf(j));
            }
        }
    }

    public void onError(Throwable th) {
        LOG.error("Error on sending message", th);
    }

    public void onCompleted() {
    }

    private RaftProtos.AppendEntriesRequestProto createInitialLogEntry(long j) {
        return RaftProtos.AppendEntriesRequestProto.newBuilder().setLeaderTerm(this.term).addEntries(RaftProtos.LogEntryProto.newBuilder().setTerm(this.term).setIndex(0L).setConfigurationEntry(RaftProtos.RaftConfigurationProto.newBuilder().addPeers(RaftProtos.RaftPeerProto.newBuilder().setId(RaftPeerId.valueOf(this.serverAddress).toByteString()).setAddress(this.serverAddress).build()).addPeers(this.requestor).build()).build()).setServerRequest(createServerRequest(j)).build();
    }

    private CompletableFuture<RaftProtos.RequestVoteReplyProto> requestVote() {
        final CompletableFuture<RaftProtos.RequestVoteReplyProto> completableFuture = new CompletableFuture<>();
        this.stub.requestVote(RaftProtos.RequestVoteRequestProto.newBuilder().setServerRequest(createServerRequest(this.callIdRandom.nextLong())).setCandidateLastEntry(RaftProtos.TermIndexProto.newBuilder().setIndex(0L).setTerm(this.term).build()).build(), new StreamObserver<RaftProtos.RequestVoteReplyProto>() { // from class: org.apache.hadoop.ozone.freon.FollowerAppendLogEntryGenerator.1
            public void onNext(RaftProtos.RequestVoteReplyProto requestVoteReplyProto) {
                completableFuture.complete(requestVoteReplyProto);
            }

            public void onError(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            public void onCompleted() {
            }
        });
        return completableFuture;
    }

    private RaftProtos.RaftRpcRequestProto createServerRequest(long j) {
        return RaftProtos.RaftRpcRequestProto.newBuilder().setRaftGroupId(RaftProtos.RaftGroupIdProto.newBuilder().setId(RaftGroupId.valueOf(UUID.fromString(this.pipelineId)).toByteString()).build()).setRequestorId(this.requestor.getId()).setCallId(j).build();
    }
}
