package org.apache.ratis.server.impl;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ratis.BaseTest;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
import org.apache.ratis.examples.arithmetic.expression.BinaryExpression;
import org.apache.ratis.examples.arithmetic.expression.DoubleValue;
import org.apache.ratis.examples.arithmetic.expression.Expression;
import org.apache.ratis.examples.arithmetic.expression.Variable;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Slf4jUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/ratis/server/impl/TestReadAfterWrite.class */
public class TestReadAfterWrite extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/server/impl/TestReadAfterWrite$BlockingCode.class */
    public static class BlockingCode implements CodeInjectionForTesting.Code {
        private final CompletableFuture<Void> future = new CompletableFuture<>();

        BlockingCode() {
        }

        void complete() {
            this.future.complete(null);
        }

        public boolean execute(Object obj, Object obj2, Object... objArr) {
            boolean z = !this.future.isDone();
            if (z) {
                LOG.info("Server {} blocks client {}: {}", new Object[]{obj, obj2, objArr[0]});
            }
            this.future.join();
            if (!z) {
                return true;
            }
            LOG.info("Server {} unblocks client {}", obj, obj2);
            return true;
        }
    }

    @Before
    public void setup() {
        Slf4jUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.DEBUG);
        Slf4jUtils.setLogLevel(CodeInjectionForTesting.LOG, Level.DEBUG);
        Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
        RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);
        RaftProperties properties = getProperties();
        properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, ArithmeticStateMachine.class, StateMachine.class);
        RaftServerConfigKeys.Read.setOption(properties, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
    }

    @Test
    public void testReadAfterWriteSingleServer() throws Exception {
        runWithNewCluster(1, miniRaftClusterWithGrpc -> {
            RaftClient createClient = miniRaftClusterWithGrpc.createClient();
            Throwable th = null;
            try {
                try {
                    runTestReadAfterWrite(createClient);
                    if (createClient != null) {
                        if (0 == 0) {
                            createClient.close();
                            return;
                        }
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createClient != null) {
                    if (th != null) {
                        try {
                            createClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th4;
            }
        });
    }

    @Test
    public void testReadAfterWrite() throws Exception {
        runWithNewCluster(3, miniRaftClusterWithGrpc -> {
            RaftClient createClient = miniRaftClusterWithGrpc.createClient();
            Throwable th = null;
            try {
                try {
                    runTestReadAfterWrite(createClient);
                    if (createClient != null) {
                        if (0 == 0) {
                            createClient.close();
                            return;
                        }
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createClient != null) {
                    if (th != null) {
                        try {
                            createClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th4;
            }
        });
    }

    void runTestReadAfterWrite(RaftClient raftClient) throws Exception {
        Variable variable = new Variable("a");
        BinaryExpression apply = BinaryExpression.Op.ADD.apply(variable, new DoubleValue(2.0d));
        AsyncApi async = raftClient.async();
        Assert.assertTrue(((RaftClientReply) async.send(variable.assign(new DoubleValue(10.0d))).join()).isSuccess());
        Message message = Expression.Utils.toMessage(variable);
        assertReply(async.sendReadOnly(message), 10);
        BlockingCode blockingCode = new BlockingCode();
        CodeInjectionForTesting.put(RaftServerImpl.APPEND_TRANSACTION, blockingCode);
        CompletableFuture send = async.send(variable.assign(apply));
        CompletableFuture<RaftClientReply> sendReadOnlyUnordered = async.sendReadOnlyUnordered(message);
        CompletableFuture<RaftClientReply> sendReadAfterWrite = async.sendReadAfterWrite(message);
        Thread.sleep(1000L);
        assertReply(sendReadOnlyUnordered, 10);
        this.LOG.info("readAfterWrite.get");
        try {
            RaftClientReply raftClientReply = sendReadAfterWrite.get(100L, TimeUnit.MILLISECONDS);
            Assert.fail("result=" + Expression.Utils.bytes2Expression(raftClientReply.getMessage().getContent().toByteArray(), 0) + ", reply=" + raftClientReply);
        } catch (TimeoutException e) {
            this.LOG.info("Good", e);
        }
        Assert.assertFalse(send.isDone());
        Assert.assertFalse(sendReadAfterWrite.isDone());
        blockingCode.complete();
        assertReply(sendReadAfterWrite, 12);
    }

    void assertReply(CompletableFuture<RaftClientReply> completableFuture, int i) {
        this.LOG.info("assertReply, expected {}", Integer.valueOf(i));
        RaftClientReply join = completableFuture.join();
        Assert.assertTrue(join.isSuccess());
        this.LOG.info("reply {}", join);
        Assert.assertEquals(i, (int) Expression.Utils.bytes2Expression(join.getMessage().getContent().toByteArray(), 0).evaluate((Map) null).doubleValue());
    }
}
