package org.apache.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/proto/TestPerChannelBookieClient.class */
public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
    static Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class);

    public TestPerChannelBookieClient() {
        super(1);
    }

    @Test(timeout = 60000)
    public void testConnectCloseRace() throws Exception {
        NioClientSocketChannelFactory nioClientSocketChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1);
        InetSocketAddress bookie = getBookie(0);
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < 1000; i++) {
            PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioClientSocketChannelFactory, bookie, atomicLong);
            perChannelBookieClient.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.1
                public void operationComplete(int i2, Void r3) {
                }
            });
            perChannelBookieClient.close();
        }
        nioClientSocketChannelFactory.releaseExternalResources();
        orderedSafeExecutor.shutdown();
    }

    @Test(timeout = 60000)
    public void testConnectRace() throws Exception {
        BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback = new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.2
            public void operationComplete(int i, Void r3) {
            }
        };
        NioClientSocketChannelFactory nioClientSocketChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1);
        InetSocketAddress bookie = getBookie(0);
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < 100; i++) {
            PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioClientSocketChannelFactory, bookie, atomicLong);
            for (int i2 = i; i2 < 10; i2++) {
                perChannelBookieClient.connectIfNeededAndDoOp(genericCallback);
            }
            perChannelBookieClient.close();
        }
        nioClientSocketChannelFactory.releaseExternalResources();
        orderedSafeExecutor.shutdown();
    }

    @Test(timeout = 60000)
    public void testDisconnectRace() throws Exception {
        final BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback = new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.3
            public void operationComplete(int i, Void r3) {
            }
        };
        NioClientSocketChannelFactory nioClientSocketChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1);
        final PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioClientSocketChannelFactory, getBookie(0), new AtomicLong(0L));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
                        TestPerChannelBookieClient.LOG.error("Disconnect thread never started");
                        atomicBoolean.set(true);
                    }
                } catch (InterruptedException e) {
                    TestPerChannelBookieClient.LOG.error("Connect thread interrupted", e);
                    Thread.currentThread().interrupt();
                    atomicBoolean2.set(false);
                }
                for (int i = 0; i < 100000 && atomicBoolean2.get(); i++) {
                    perChannelBookieClient.connectIfNeededAndDoOp(genericCallback);
                }
                atomicBoolean2.set(false);
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                while (atomicBoolean2.get()) {
                    perChannelBookieClient.disconnect();
                }
            }
        };
        Thread thread3 = new Thread() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (atomicBoolean2.get()) {
                    synchronized (perChannelBookieClient) {
                        PerChannelBookieClient.ConnectionState connectionState = perChannelBookieClient.state;
                        Channel channel = perChannelBookieClient.channel;
                        if ((connectionState == PerChannelBookieClient.ConnectionState.CONNECTED && (channel == null || !channel.isConnected())) || (connectionState != PerChannelBookieClient.ConnectionState.CONNECTED && channel != null && channel.isConnected())) {
                            TestPerChannelBookieClient.LOG.error("State({}) and channel({}) inconsistent " + channel, connectionState, channel == null ? null : Boolean.valueOf(channel.isConnected()));
                            atomicBoolean.set(true);
                            atomicBoolean2.set(false);
                        }
                    }
                }
            }
        };
        thread.start();
        thread2.start();
        thread3.start();
        thread.join();
        thread2.join();
        thread3.join();
        assertFalse("Failure in threads, check logs", atomicBoolean.get());
        perChannelBookieClient.close();
        nioClientSocketChannelFactory.releaseExternalResources();
        orderedSafeExecutor.shutdown();
    }

    @Test(timeout = 60000)
    public void testRequestCompletesAfterDisconnectRace() throws Exception {
        ServerConfiguration killBookie = killBookie(0);
        Bookie bookie = new Bookie(killBookie) { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.7
            public ByteBuffer readEntry(long j, long j2) throws IOException, Bookie.NoLedgerException {
                try {
                    Thread.sleep(3000L);
                    return super.readEntry(j, j2);
                } catch (InterruptedException e) {
                    throw new IOException("Interrupted waiting", e);
                }
            }
        };
        this.bsConfs.add(killBookie);
        this.bs.add(startBookie(killBookie, bookie));
        NioClientSocketChannelFactory nioClientSocketChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        final OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1);
        final PerChannelBookieClient perChannelBookieClient = new PerChannelBookieClient(orderedSafeExecutor, nioClientSocketChannelFactory, getBookie(0), new AtomicLong(0L));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final BookkeeperInternalCallbacks.ReadEntryCallback readEntryCallback = new BookkeeperInternalCallbacks.ReadEntryCallback() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.8
            public void readEntryComplete(int i, long j, long j2, ChannelBuffer channelBuffer, Object obj) {
                countDownLatch.countDown();
            }
        };
        perChannelBookieClient.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.9
            public void operationComplete(final int i, Void r11) {
                if (i != 0) {
                    orderedSafeExecutor.submitOrdered(1, new SafeRunnable() { // from class: org.apache.bookkeeper.proto.TestPerChannelBookieClient.9.1
                        public void safeRun() {
                            readEntryCallback.readEntryComplete(i, 1L, 1L, (ChannelBuffer) null, (Object) null);
                        }
                    });
                } else {
                    perChannelBookieClient.readEntryAndFenceLedger(1L, "00000111112222233333".getBytes(), 1L, readEntryCallback, (Object) null);
                }
            }
        });
        Thread.sleep(1000L);
        perChannelBookieClient.disconnect();
        perChannelBookieClient.close();
        nioClientSocketChannelFactory.releaseExternalResources();
        orderedSafeExecutor.shutdown();
        assertTrue("Request should have completed", countDownLatch.await(5L, TimeUnit.SECONDS));
    }
}
