package co.cask.cdap.messaging.service;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.metrics.MetricsCollector;
import co.cask.cdap.common.utils.TimeProvider;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriterTest.class */
public class ConcurrentMessageWriterTest {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentMessageWriterTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriterTest$TestEntry.class */
    public static final class TestEntry {
        private final TopicId topicId;
        private final boolean transactional;
        private final long transactionWritePointer;
        private final long writeTimestamp;
        private final short sequenceId;
        private final byte[] payload;

        private TestEntry(TopicId topicId, boolean z, long j, long j2, short s, @Nullable byte[] bArr) {
            this.topicId = topicId;
            this.transactional = z;
            this.transactionWritePointer = j;
            this.writeTimestamp = j2;
            this.sequenceId = s;
            this.payload = bArr;
        }

        public TopicId getTopicId() {
            return this.topicId;
        }

        public boolean isTransactional() {
            return this.transactional;
        }

        public long getTransactionWritePointer() {
            return this.transactionWritePointer;
        }

        public long getWriteTimestamp() {
            return this.writeTimestamp;
        }

        public short getSequenceId() {
            return this.sequenceId;
        }

        @Nullable
        public byte[] getPayload() {
            return this.payload;
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriterTest$TestStoreRequest.class */
    private static final class TestStoreRequest extends StoreRequest {
        private final Iterator<String> payloads;

        protected TestStoreRequest(TopicId topicId, List<String> list) {
            this(topicId, list.iterator());
        }

        protected TestStoreRequest(TopicId topicId, Iterator<String> it) {
            this(topicId, false, -1L, it);
        }

        protected TestStoreRequest(TopicId topicId, boolean z, long j, Iterator<String> it) {
            super(topicId, z, j);
            this.payloads = it;
        }

        @Nullable
        protected byte[] doComputeNext() {
            if (this.payloads.hasNext()) {
                return Bytes.toBytes(this.payloads.next());
            }
            return null;
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriterTest$TestStoreRequestWriter.class */
    private static final class TestStoreRequestWriter extends StoreRequestWriter<TestEntry> {
        private final ListMultimap<TopicId, RawMessage> messages;
        private long writeDelayMillis;

        TestStoreRequestWriter(TimeProvider timeProvider) {
            super(timeProvider, false);
            this.messages = ArrayListMultimap.create();
        }

        TestStoreRequestWriter(TimeProvider timeProvider, long j) {
            super(timeProvider, false);
            this.messages = ArrayListMultimap.create();
            this.writeDelayMillis = j;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: getEntry, reason: merged with bridge method [inline-methods] */
        public TestEntry m6getEntry(TopicMetadata topicMetadata, boolean z, long j, long j2, short s, @Nullable byte[] bArr) {
            return new TestEntry(topicMetadata.getTopicId(), z, j, j2, s, bArr);
        }

        protected void doWrite(Iterator<TestEntry> it) throws IOException {
            while (it.hasNext()) {
                TestEntry next = it.next();
                byte[] bArr = new byte[20];
                MessageId.putRawId(next.getWriteTimestamp(), next.getSequenceId(), 0L, (short) 0, bArr, 0);
                byte[] payload = next.getPayload();
                this.messages.put(next.getTopicId(), new RawMessage(bArr, payload == null ? null : Arrays.copyOf(payload, payload.length)));
            }
            if (this.writeDelayMillis > 0) {
                Uninterruptibles.sleepUninterruptibly(this.writeDelayMillis, TimeUnit.MILLISECONDS);
            }
        }

        ListMultimap<TopicId, RawMessage> getMessages() {
            return this.messages;
        }

        public void close() throws IOException {
        }
    }

    @Test
    public void testBasic() throws IOException {
        TopicId topicId = new NamespaceId("ns1").topic("t1");
        TopicId topicId2 = new NamespaceId("ns2").topic("t2");
        TopicMetadata topicMetadata = new TopicMetadata(topicId, new Object[]{new HashMap(), 1});
        TopicMetadata topicMetadata2 = new TopicMetadata(topicId2, new Object[]{new HashMap(), 1});
        TestStoreRequestWriter testStoreRequestWriter = new TestStoreRequestWriter(new TimeProvider.IncrementalTimeProvider());
        ConcurrentMessageWriter concurrentMessageWriter = new ConcurrentMessageWriter(testStoreRequestWriter);
        concurrentMessageWriter.persist(new TestStoreRequest(topicId, (List<String>) Arrays.asList("1", "2", "3")), topicMetadata);
        List<RawMessage> list = testStoreRequestWriter.getMessages().get(topicId);
        Assert.assertEquals(3L, list.size());
        ArrayList arrayList = new ArrayList();
        for (RawMessage rawMessage : list) {
            Assert.assertEquals(0L, new MessageId(rawMessage.getId()).getPublishTimestamp());
            arrayList.add(Bytes.toString(rawMessage.getPayload()));
        }
        Assert.assertEquals(Arrays.asList("1", "2", "3"), arrayList);
        concurrentMessageWriter.persist(new TestStoreRequest(topicId2, (List<String>) Arrays.asList("a", "b", "c")), topicMetadata2);
        List<RawMessage> list2 = testStoreRequestWriter.getMessages().get(topicId2);
        Assert.assertEquals(3L, list2.size());
        arrayList.clear();
        for (RawMessage rawMessage2 : list2) {
            Assert.assertEquals(1L, new MessageId(rawMessage2.getId()).getPublishTimestamp());
            arrayList.add(Bytes.toString(rawMessage2.getPayload()));
        }
        Assert.assertEquals(Arrays.asList("a", "b", "c"), arrayList);
    }

    @Test
    public void testMaxSequence() throws IOException {
        ArrayList arrayList = new ArrayList(65537);
        for (int i = 0; i < 65537; i++) {
            arrayList.add(Integer.toString(i));
        }
        TopicId topicId = new NamespaceId("ns1").topic("t1");
        TopicMetadata topicMetadata = new TopicMetadata(topicId, new Object[]{new HashMap(), 1});
        TestStoreRequestWriter testStoreRequestWriter = new TestStoreRequestWriter(new TimeProvider.IncrementalTimeProvider());
        new ConcurrentMessageWriter(testStoreRequestWriter).persist(new TestStoreRequest(topicId, arrayList), topicMetadata);
        List list = testStoreRequestWriter.getMessages().get(topicId);
        Assert.assertEquals(65537, list.size());
        for (int i2 = 0; i2 < 65536; i2++) {
            Assert.assertEquals(0L, new MessageId(((RawMessage) list.get(i2)).getId()).getPublishTimestamp());
            Assert.assertEquals((short) i2, r0.getSequenceId());
        }
        Assert.assertEquals(1L, new MessageId(((RawMessage) list.get(65537 - 1)).getId()).getPublishTimestamp());
        Assert.assertEquals(0L, r0.getPayloadSequenceId());
    }

    @Test
    public void testMultiMaxSequence() throws IOException, InterruptedException {
        TopicId topicId = new NamespaceId("ns1").topic("t1");
        final TopicMetadata topicMetadata = new TopicMetadata(topicId, new Object[]{new HashMap(), 1});
        ArrayList<StoreRequest> arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            ArrayList arrayList2 = new ArrayList(43690);
            for (int i2 = 0; i2 < 43690; i2++) {
                arrayList2.add(Integer.toString(i2));
            }
            arrayList.add(new TestStoreRequest(topicId, arrayList2));
        }
        TestStoreRequestWriter testStoreRequestWriter = new TestStoreRequestWriter(new TimeProvider.IncrementalTimeProvider());
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final ConcurrentMessageWriter concurrentMessageWriter = new ConcurrentMessageWriter(testStoreRequestWriter, new MetricsCollector() { // from class: co.cask.cdap.messaging.service.ConcurrentMessageWriterTest.1
            public void increment(String str, long j) {
                if ("persist.requested".equals(str)) {
                    countDownLatch.countDown();
                    Uninterruptibles.awaitUninterruptibly(countDownLatch);
                }
            }

            public void gauge(String str, long j) {
                ConcurrentMessageWriterTest.LOG.info("MetricsContext.gauge: {} = {}", str, Long.valueOf(j));
            }
        });
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        for (final StoreRequest storeRequest : arrayList) {
            newFixedThreadPool.submit(new Runnable() { // from class: co.cask.cdap.messaging.service.ConcurrentMessageWriterTest.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        concurrentMessageWriter.persist(storeRequest, topicMetadata);
                    } catch (IOException e) {
                        ConcurrentMessageWriterTest.LOG.error("Failed to persist", e);
                    }
                }
            });
        }
        newFixedThreadPool.shutdown();
        Assert.assertTrue(newFixedThreadPool.awaitTermination(1L, TimeUnit.MINUTES));
        List list = testStoreRequestWriter.getMessages().get(topicId);
        Assert.assertEquals(3 * 43690, list.size());
        int i3 = 0;
        for (int i4 = 0; i4 < list.size(); i4++) {
            Assert.assertEquals(i4 / 65536, new MessageId(((RawMessage) list.get(i4)).getId()).getPublishTimestamp());
            Assert.assertEquals((short) (i4 % 65536), r0.getSequenceId());
            Assert.assertEquals(i3, Integer.parseInt(Bytes.toString(r0.getPayload())));
            i3 = (i3 + 1) % 43690;
        }
    }

    @Test
    public void testConcurrentWrites() throws InterruptedException, BrokenBarrierException {
        final TopicId topicId = NamespaceId.DEFAULT.topic("t");
        final TopicMetadata topicMetadata = new TopicMetadata(topicId, new Object[]{new HashMap(), 1});
        TestStoreRequestWriter testStoreRequestWriter = new TestStoreRequestWriter(new TimeProvider.IncrementalTimeProvider(), 50L);
        final ConcurrentMessageWriter concurrentMessageWriter = new ConcurrentMessageWriter(testStoreRequestWriter);
        final ArrayList arrayList = new ArrayList(200);
        for (int i = 0; i < 200; i++) {
            arrayList.add(Integer.toString(i));
        }
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(20 + 1);
        for (int i2 = 0; i2 < 20; i2++) {
            final int i3 = i2;
            newFixedThreadPool.submit(new Runnable() { // from class: co.cask.cdap.messaging.service.ConcurrentMessageWriterTest.3
                @Override // java.lang.Runnable
                public void run() {
                    Stopwatch stopwatch = new Stopwatch();
                    try {
                        cyclicBarrier.await();
                        stopwatch.start();
                        for (int i4 = 0; i4 < 20; i4++) {
                            concurrentMessageWriter.persist(new TestStoreRequest(topicId, (List<String>) arrayList), topicMetadata);
                        }
                        ConcurrentMessageWriterTest.LOG.info("Complete time for thread {} is {} ms", Integer.valueOf(i3), Long.valueOf(stopwatch.elapsedMillis()));
                    } catch (Exception e) {
                        ConcurrentMessageWriterTest.LOG.error("Exception raised when persisting.", e);
                    }
                }
            });
        }
        Stopwatch stopwatch = new Stopwatch();
        cyclicBarrier.await();
        stopwatch.start();
        newFixedThreadPool.shutdown();
        Assert.assertTrue(newFixedThreadPool.awaitTermination(1L, TimeUnit.MINUTES));
        LOG.info("Total time passed: {} ms", Long.valueOf(stopwatch.elapsedMillis()));
        List<RawMessage> list = testStoreRequestWriter.getMessages().get(topicId);
        Assert.assertEquals(200 * 20 * 20, list.size());
        RawMessage rawMessage = null;
        for (RawMessage rawMessage2 : list) {
            if (rawMessage != null) {
                Assert.assertTrue(Bytes.compareTo(rawMessage.getId(), rawMessage2.getId()) < 0);
            }
            rawMessage = rawMessage2;
        }
    }
}
