package co.cask.cdap.messaging.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.tephra.Transaction;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/messaging/store/MessageTableTest.class */
public abstract class MessageTableTest {
    private static final Logger LOG = LoggerFactory.getLogger(MessageTableTest.class);
    private static final TopicId T1 = NamespaceId.DEFAULT.topic("messaget1");
    private static final TopicId T2 = NamespaceId.DEFAULT.topic("messaget2");
    private static final int GENERATION = 1;
    private static final Map<String, String> DEFAULT_PROPERTY = ImmutableMap.of("ttl", Integer.toString(10000), "generation", Integer.toString(GENERATION));
    private static final TopicMetadata M1 = new TopicMetadata(T1, DEFAULT_PROPERTY);
    private static final TopicMetadata M2 = new TopicMetadata(T2, DEFAULT_PROPERTY);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/store/MessageTableTest$TestMessageEntry.class */
    public static class TestMessageEntry implements MessageTable.Entry {
        private final TopicId topicId;
        private final int generation;
        private final Long transactionWritePointer;
        private final byte[] payload;
        private final long publishTimestamp;
        private final short sequenceId;

        TestMessageEntry(TopicId topicId, int i, long j, int i2, @Nullable Long l, @Nullable byte[] bArr) {
            this.topicId = topicId;
            this.generation = i;
            this.transactionWritePointer = l;
            this.publishTimestamp = j;
            this.sequenceId = (short) i2;
            this.payload = bArr;
        }

        public TopicId getTopicId() {
            return this.topicId;
        }

        public int getGeneration() {
            return this.generation;
        }

        public boolean isPayloadReference() {
            return this.payload == null;
        }

        public boolean isTransactional() {
            return this.transactionWritePointer != null;
        }

        public long getTransactionWritePointer() {
            if (this.transactionWritePointer == null) {
                return -1L;
            }
            return this.transactionWritePointer.longValue();
        }

        @Nullable
        public byte[] getPayload() {
            return this.payload;
        }

        public long getPublishTimestamp() {
            return this.publishTimestamp;
        }

        public short getSequenceId() {
            return this.sequenceId;
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/store/MessageTableTest$TestRollbackDetail.class */
    private static class TestRollbackDetail implements RollbackDetail {
        private final long txWritePtr;
        private final long startTimestamp;
        private final int startSeqId;
        private final long endTimestamp;
        private final int endSeqId;

        TestRollbackDetail(long j, long j2, int i, long j3, int i2) {
            this.txWritePtr = j;
            this.startTimestamp = j2;
            this.startSeqId = i;
            this.endTimestamp = j3;
            this.endSeqId = i2;
        }

        public long getTransactionWritePointer() {
            return this.txWritePtr;
        }

        public long getStartTimestamp() {
            return this.startTimestamp;
        }

        public int getStartSequenceId() {
            return this.startSeqId;
        }

        public long getEndTimestamp() {
            return this.endTimestamp;
        }

        public int getEndSequenceId() {
            return this.endSeqId;
        }
    }

    protected abstract MessageTable getMessageTable() throws Exception;

    protected abstract MetadataTable getMetadataTable() throws Exception;

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSingleMessage() throws Exception {
        TopicId topicId = NamespaceId.DEFAULT.topic("singleMessage");
        TopicMetadata topicMetadata = new TopicMetadata(topicId, DEFAULT_PROPERTY);
        MessageTable messageTable = getMessageTable();
        Throwable th = null;
        try {
            MetadataTable metadataTable = getMetadataTable();
            Throwable th2 = null;
            try {
                metadataTable.createTopic(topicMetadata);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new TestMessageEntry(topicId, GENERATION, 0L, 0, 123L, Bytes.toBytes("data")));
                messageTable.store(arrayList.iterator());
                byte[] bArr = new byte[20];
                MessageId.putRawId(0L, (short) 0, 0L, (short) 0, bArr, 0);
                CloseableIterator fetch = messageTable.fetch(topicMetadata, new MessageId(bArr), false, 50, (Transaction) null);
                Throwable th3 = null;
                try {
                    try {
                        Assert.assertFalse(fetch.hasNext());
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        CloseableIterator fetch2 = messageTable.fetch(topicMetadata, new MessageId(bArr), true, 50, (Transaction) null);
                        Throwable th5 = null;
                        try {
                            Assert.assertTrue(fetch2.hasNext());
                            Assert.assertArrayEquals(Bytes.toBytes("data"), ((MessageTable.Entry) fetch2.next()).getPayload());
                            Assert.assertFalse(fetch2.hasNext());
                            if (fetch2 != null) {
                                if (0 != 0) {
                                    try {
                                        fetch2.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    fetch2.close();
                                }
                            }
                            CloseableIterator fetch3 = messageTable.fetch(topicMetadata, 0L, 50, (Transaction) null);
                            Throwable th7 = null;
                            try {
                                try {
                                    Assert.assertArrayEquals(Bytes.toBytes("data"), ((MessageTable.Entry) fetch3.next()).getPayload());
                                    Assert.assertFalse(fetch3.hasNext());
                                    if (fetch3 != null) {
                                        if (0 != 0) {
                                            try {
                                                fetch3.close();
                                            } catch (Throwable th8) {
                                                th7.addSuppressed(th8);
                                            }
                                        } else {
                                            fetch3.close();
                                        }
                                    }
                                    messageTable.rollback(topicMetadata, new TestRollbackDetail(123L, 0L, 0, 0L, 0));
                                    fetch = messageTable.fetch(topicMetadata, new MessageId(bArr), true, 50, (Transaction) null);
                                    Throwable th9 = null;
                                    try {
                                        try {
                                            Assert.assertArrayEquals(Bytes.toBytes("data"), ((MessageTable.Entry) fetch.next()).getPayload());
                                            Assert.assertFalse(fetch.hasNext());
                                            if (fetch != null) {
                                                if (0 != 0) {
                                                    try {
                                                        fetch.close();
                                                    } catch (Throwable th10) {
                                                        th9.addSuppressed(th10);
                                                    }
                                                } else {
                                                    fetch.close();
                                                }
                                            }
                                            CloseableIterator fetch4 = messageTable.fetch(topicMetadata, new MessageId(bArr), true, 50, new Transaction(200L, 200L, new long[0], new long[0], -1L));
                                            Throwable th11 = null;
                                            try {
                                                Assert.assertFalse(fetch4.hasNext());
                                                if (fetch4 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            fetch4.close();
                                                        } catch (Throwable th12) {
                                                            th11.addSuppressed(th12);
                                                        }
                                                    } else {
                                                        fetch4.close();
                                                    }
                                                }
                                                if (metadataTable != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            metadataTable.close();
                                                        } catch (Throwable th13) {
                                                            th2.addSuppressed(th13);
                                                        }
                                                    } else {
                                                        metadataTable.close();
                                                    }
                                                }
                                                if (messageTable != null) {
                                                    if (0 == 0) {
                                                        messageTable.close();
                                                        return;
                                                    }
                                                    try {
                                                        messageTable.close();
                                                    } catch (Throwable th14) {
                                                        th.addSuppressed(th14);
                                                    }
                                                }
                                            } catch (Throwable th15) {
                                                if (fetch4 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            fetch4.close();
                                                        } catch (Throwable th16) {
                                                            th11.addSuppressed(th16);
                                                        }
                                                    } else {
                                                        fetch4.close();
                                                    }
                                                }
                                                throw th15;
                                            }
                                        } catch (Throwable th17) {
                                            th9 = th17;
                                            throw th17;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th18) {
                                    th7 = th18;
                                    throw th18;
                                }
                            } catch (Throwable th19) {
                                if (fetch3 != null) {
                                    if (th7 != null) {
                                        try {
                                            fetch3.close();
                                        } catch (Throwable th20) {
                                            th7.addSuppressed(th20);
                                        }
                                    } else {
                                        fetch3.close();
                                    }
                                }
                                throw th19;
                            }
                        } catch (Throwable th21) {
                            if (fetch2 != null) {
                                if (0 != 0) {
                                    try {
                                        fetch2.close();
                                    } catch (Throwable th22) {
                                        th5.addSuppressed(th22);
                                    }
                                } else {
                                    fetch2.close();
                                }
                            }
                            throw th21;
                        }
                    } catch (Throwable th23) {
                        th3 = th23;
                        throw th23;
                    }
                } finally {
                }
            } catch (Throwable th24) {
                if (metadataTable != null) {
                    if (0 != 0) {
                        try {
                            metadataTable.close();
                        } catch (Throwable th25) {
                            th2.addSuppressed(th25);
                        }
                    } else {
                        metadataTable.close();
                    }
                }
                throw th24;
            }
        } catch (Throwable th26) {
            if (messageTable != null) {
                if (0 != 0) {
                    try {
                        messageTable.close();
                    } catch (Throwable th27) {
                        th.addSuppressed(th27);
                    }
                } else {
                    messageTable.close();
                }
            }
            throw th26;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 16, insn: 0x071a: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:346:0x071a */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x071e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:348:0x071e */
    /* JADX WARN: Type inference failed for: r16v0, types: [co.cask.cdap.messaging.store.MetadataTable] */
    /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable] */
    @Test
    public void testNonTxAndTxConsumption() throws Exception {
        ?? r16;
        ?? r17;
        CloseableIterator<MessageTable.Entry> fetch;
        Throwable th;
        CloseableIterator<MessageTable.Entry> fetch2;
        Throwable th2;
        CloseableIterator<MessageTable.Entry> fetch3;
        Throwable th3;
        MessageTable messageTable = getMessageTable();
        Throwable th4 = null;
        try {
            try {
                MetadataTable metadataTable = getMetadataTable();
                Throwable th5 = null;
                metadataTable.createTopic(M1);
                metadataTable.createTopic(M2);
                ArrayList arrayList = new ArrayList();
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                long populateList = populateList(arrayList, Arrays.asList(100L, 101L, 102L), hashMap, hashMap2);
                messageTable.store(arrayList.iterator());
                CloseableIterator<MessageTable.Entry> fetch4 = messageTable.fetch(M1, 0L, Integer.MAX_VALUE, (Transaction) null);
                Throwable th6 = null;
                try {
                    try {
                        checkPointerCount(fetch4, 123, ImmutableSet.of(100L, 101L, 102L), 150);
                        if (fetch4 != null) {
                            if (0 != 0) {
                                try {
                                    fetch4.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                fetch4.close();
                            }
                        }
                        CloseableIterator<MessageTable.Entry> fetch5 = messageTable.fetch(M1, 0L, 85, (Transaction) null);
                        Throwable th8 = null;
                        try {
                            try {
                                checkPointerCount(fetch5, 123, ImmutableSet.of(100L, 101L, 102L), 85);
                                if (fetch5 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch5.close();
                                        } catch (Throwable th9) {
                                            th8.addSuppressed(th9);
                                        }
                                    } else {
                                        fetch5.close();
                                    }
                                }
                                fetch = messageTable.fetch(M1, 0L, Integer.MAX_VALUE, new Transaction(200L, 200L, new long[0], new long[0], -1L));
                                th = null;
                            } catch (Throwable th10) {
                                th8 = th10;
                                throw th10;
                            }
                        } finally {
                            if (fetch5 != null) {
                                if (th8 != null) {
                                    try {
                                        fetch5.close();
                                    } catch (Throwable th11) {
                                        th8.addSuppressed(th11);
                                    }
                                } else {
                                    fetch5.close();
                                }
                            }
                        }
                    } catch (Throwable th12) {
                        th6 = th12;
                        throw th12;
                    }
                    try {
                        try {
                            checkPointerCount(fetch, 123, ImmutableSet.of(100L, 101L, 102L), 150);
                            if (fetch != null) {
                                if (0 != 0) {
                                    try {
                                        fetch.close();
                                    } catch (Throwable th13) {
                                        th.addSuppressed(th13);
                                    }
                                } else {
                                    fetch.close();
                                }
                            }
                            fetch2 = messageTable.fetch(M1, 0L, Integer.MAX_VALUE, new Transaction(200L, 200L, new long[]{101}, new long[0], -1L));
                            th2 = null;
                        } catch (Throwable th14) {
                            th = th14;
                            throw th14;
                        }
                        try {
                            checkPointerCount(fetch2, 123, ImmutableSet.of(100L, 102L), 100);
                            if (fetch2 != null) {
                                if (0 != 0) {
                                    try {
                                        fetch2.close();
                                    } catch (Throwable th15) {
                                        th2.addSuppressed(th15);
                                    }
                                } else {
                                    fetch2.close();
                                }
                            }
                            Transaction transaction = new Transaction(100L, 100L, new long[0], new long[]{101}, -1L);
                            CloseableIterator<MessageTable.Entry> fetch6 = messageTable.fetch(M1, 0L, Integer.MAX_VALUE, transaction);
                            Throwable th16 = null;
                            try {
                                checkPointerCount(fetch6, 123, ImmutableSet.of(100L), 50);
                                if (fetch6 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch6.close();
                                        } catch (Throwable th17) {
                                            th16.addSuppressed(th17);
                                        }
                                    } else {
                                        fetch6.close();
                                    }
                                }
                                CloseableIterator<MessageTable.Entry> fetch7 = messageTable.fetch(M1, 0L, 10, transaction);
                                Throwable th18 = null;
                                try {
                                    try {
                                        checkPointerCount(fetch7, 123, ImmutableSet.of(100L), 10);
                                        if (fetch7 != null) {
                                            if (0 != 0) {
                                                try {
                                                    fetch7.close();
                                                } catch (Throwable th19) {
                                                    th18.addSuppressed(th19);
                                                }
                                            } else {
                                                fetch7.close();
                                            }
                                        }
                                        fetch3 = messageTable.fetch(M2, 0L, Integer.MAX_VALUE, (Transaction) null);
                                        th3 = null;
                                    } catch (Throwable th20) {
                                        th18 = th20;
                                        throw th20;
                                    }
                                    try {
                                        checkPointerCount(fetch3, 321, ImmutableSet.of(100L, 101L, 102L), 150);
                                        if (fetch3 != null) {
                                            if (0 != 0) {
                                                try {
                                                    fetch3.close();
                                                } catch (Throwable th21) {
                                                    th3.addSuppressed(th21);
                                                }
                                            } else {
                                                fetch3.close();
                                            }
                                        }
                                        messageTable.rollback(M1, new TestRollbackDetail(101L, populateList, hashMap.get(101L).shortValue(), populateList, hashMap2.get(101L).shortValue()));
                                        fetch7 = messageTable.fetch(M1, 0L, Integer.MAX_VALUE, (Transaction) null);
                                        Throwable th22 = null;
                                        try {
                                            try {
                                                checkPointerCount(fetch7, 123, ImmutableSet.of(100L, 101L, 102L), 150);
                                                if (fetch7 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            fetch7.close();
                                                        } catch (Throwable th23) {
                                                            th22.addSuppressed(th23);
                                                        }
                                                    } else {
                                                        fetch7.close();
                                                    }
                                                }
                                                messageTable.rollback(M1, new TestRollbackDetail(100L, populateList, hashMap.get(100L).shortValue(), populateList, hashMap2.get(100L).shortValue()));
                                                Transaction transaction2 = new Transaction(200L, 200L, new long[0], new long[0], -1L);
                                                fetch4 = messageTable.fetch(M1, 0L, Integer.MAX_VALUE, transaction2);
                                                Throwable th24 = null;
                                                try {
                                                    try {
                                                        checkPointerCount(fetch4, 123, ImmutableSet.of(102L), 50);
                                                        if (fetch4 != null) {
                                                            if (0 != 0) {
                                                                try {
                                                                    fetch4.close();
                                                                } catch (Throwable th25) {
                                                                    th24.addSuppressed(th25);
                                                                }
                                                            } else {
                                                                fetch4.close();
                                                            }
                                                        }
                                                        CloseableIterator<MessageTable.Entry> fetch8 = messageTable.fetch(M2, 0L, Integer.MAX_VALUE, transaction2);
                                                        Throwable th26 = null;
                                                        try {
                                                            checkPointerCount(fetch8, 321, ImmutableSet.of(100L, 101L, 102L), 150);
                                                            if (fetch8 != null) {
                                                                if (0 != 0) {
                                                                    try {
                                                                        fetch8.close();
                                                                    } catch (Throwable th27) {
                                                                        th26.addSuppressed(th27);
                                                                    }
                                                                } else {
                                                                    fetch8.close();
                                                                }
                                                            }
                                                            if (metadataTable != null) {
                                                                if (0 != 0) {
                                                                    try {
                                                                        metadataTable.close();
                                                                    } catch (Throwable th28) {
                                                                        th5.addSuppressed(th28);
                                                                    }
                                                                } else {
                                                                    metadataTable.close();
                                                                }
                                                            }
                                                            if (messageTable != null) {
                                                                if (0 == 0) {
                                                                    messageTable.close();
                                                                    return;
                                                                }
                                                                try {
                                                                    messageTable.close();
                                                                } catch (Throwable th29) {
                                                                    th4.addSuppressed(th29);
                                                                }
                                                            }
                                                        } catch (Throwable th30) {
                                                            if (fetch8 != null) {
                                                                if (0 != 0) {
                                                                    try {
                                                                        fetch8.close();
                                                                    } catch (Throwable th31) {
                                                                        th26.addSuppressed(th31);
                                                                    }
                                                                } else {
                                                                    fetch8.close();
                                                                }
                                                            }
                                                            throw th30;
                                                        }
                                                    } catch (Throwable th32) {
                                                        th24 = th32;
                                                        throw th32;
                                                    }
                                                } finally {
                                                }
                                            } catch (Throwable th33) {
                                                th22 = th33;
                                                throw th33;
                                            }
                                        } finally {
                                        }
                                    } catch (Throwable th34) {
                                        if (fetch3 != null) {
                                            if (0 != 0) {
                                                try {
                                                    fetch3.close();
                                                } catch (Throwable th35) {
                                                    th3.addSuppressed(th35);
                                                }
                                            } else {
                                                fetch3.close();
                                            }
                                        }
                                        throw th34;
                                    }
                                } finally {
                                    if (fetch7 != null) {
                                        if (th18 != null) {
                                            try {
                                                fetch7.close();
                                            } catch (Throwable th36) {
                                                th18.addSuppressed(th36);
                                            }
                                        } else {
                                            fetch7.close();
                                        }
                                    }
                                }
                            } catch (Throwable th37) {
                                if (fetch6 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch6.close();
                                        } catch (Throwable th38) {
                                            th16.addSuppressed(th38);
                                        }
                                    } else {
                                        fetch6.close();
                                    }
                                }
                                throw th37;
                            }
                        } catch (Throwable th39) {
                            if (fetch2 != null) {
                                if (0 != 0) {
                                    try {
                                        fetch2.close();
                                    } catch (Throwable th40) {
                                        th2.addSuppressed(th40);
                                    }
                                } else {
                                    fetch2.close();
                                }
                            }
                            throw th39;
                        }
                    } finally {
                        if (fetch != null) {
                            if (th != null) {
                                try {
                                    fetch.close();
                                } catch (Throwable th41) {
                                    th.addSuppressed(th41);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                    }
                } finally {
                    if (fetch4 != null) {
                        if (th6 != null) {
                            try {
                                fetch4.close();
                            } catch (Throwable th42) {
                                th6.addSuppressed(th42);
                            }
                        } else {
                            fetch4.close();
                        }
                    }
                }
            } catch (Throwable th43) {
                if (messageTable != null) {
                    if (0 != 0) {
                        try {
                            messageTable.close();
                        } catch (Throwable th44) {
                            th4.addSuppressed(th44);
                        }
                    } else {
                        messageTable.close();
                    }
                }
                throw th43;
            }
        } catch (Throwable th45) {
            if (r16 != 0) {
                if (r17 != 0) {
                    try {
                        r16.close();
                    } catch (Throwable th46) {
                        r17.addSuppressed(th46);
                    }
                } else {
                    r16.close();
                }
            }
            throw th45;
        }
    }

    @Test
    public void testEmptyPayload() throws Exception {
        TopicId topicId = NamespaceId.DEFAULT.topic("testEmptyPayload");
        TopicMetadata topicMetadata = new TopicMetadata(topicId, DEFAULT_PROPERTY);
        MessageTable messageTable = getMessageTable();
        Throwable th = null;
        try {
            MetadataTable metadataTable = getMetadataTable();
            Throwable th2 = null;
            try {
                metadataTable.createTopic(topicMetadata);
                try {
                    messageTable.store(Collections.singleton(new TestMessageEntry(topicId, GENERATION, 1L, 0, null, null)).iterator());
                    Assert.fail("Expected IllegalArgumentException");
                } catch (IllegalArgumentException e) {
                }
                messageTable.store(Collections.singleton(new TestMessageEntry(topicId, GENERATION, 1L, 0, 2L, null)).iterator());
                ArrayList arrayList = new ArrayList();
                CloseableIterator fetch = messageTable.fetch(topicMetadata, 0L, Integer.MAX_VALUE, (Transaction) null);
                Throwable th3 = null;
                try {
                    try {
                        Iterators.addAll(arrayList, fetch);
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        Assert.assertEquals(1L, arrayList.size());
                        MessageTable.Entry entry = (MessageTable.Entry) arrayList.get(0);
                        Assert.assertEquals(1L, entry.getPublishTimestamp());
                        Assert.assertEquals(0L, entry.getSequenceId());
                        Assert.assertTrue(entry.isTransactional());
                        Assert.assertEquals(2L, entry.getTransactionWritePointer());
                        Assert.assertNull(entry.getPayload());
                        Assert.assertTrue(entry.isPayloadReference());
                        if (metadataTable != null) {
                            if (0 != 0) {
                                try {
                                    metadataTable.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                metadataTable.close();
                            }
                        }
                        if (messageTable != null) {
                            if (0 == 0) {
                                messageTable.close();
                                return;
                            }
                            try {
                                messageTable.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (fetch != null) {
                        if (th3 != null) {
                            try {
                                fetch.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (metadataTable != null) {
                    if (0 != 0) {
                        try {
                            metadataTable.close();
                        } catch (Throwable th11) {
                            th2.addSuppressed(th11);
                        }
                    } else {
                        metadataTable.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (messageTable != null) {
                if (0 != 0) {
                    try {
                        messageTable.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    messageTable.close();
                }
            }
            throw th12;
        }
    }

    @Test
    public void testConcurrentWrites() throws Exception {
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        for (int i = 0; i < 2; i += GENERATION) {
            final TopicId topicId = NamespaceId.DEFAULT.topic("testConcurrentWrites" + i);
            TopicMetadata topicMetadata = new TopicMetadata(topicId, DEFAULT_PROPERTY);
            MetadataTable metadataTable = getMetadataTable();
            Throwable th = null;
            try {
                try {
                    metadataTable.createTopic(topicMetadata);
                    if (metadataTable != null) {
                        if (0 != 0) {
                            try {
                                metadataTable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            metadataTable.close();
                        }
                    }
                    final int i2 = i;
                    listeningDecorator.submit(new Callable<Void>() { // from class: co.cask.cdap.messaging.store.MessageTableTest.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            try {
                                MessageTable messageTable = MessageTableTest.this.getMessageTable();
                                Throwable th3 = null;
                                try {
                                    messageTable.store(new AbstractIterator<MessageTable.Entry>() { // from class: co.cask.cdap.messaging.store.MessageTableTest.1.1
                                        int messageCount = 0;

                                        /* JADX INFO: Access modifiers changed from: protected */
                                        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
                                        public MessageTable.Entry m5computeNext() {
                                            if (this.messageCount >= 2) {
                                                return (MessageTable.Entry) endOfData();
                                            }
                                            try {
                                                cyclicBarrier.await();
                                                TopicId topicId2 = topicId;
                                                long currentTimeMillis = System.currentTimeMillis();
                                                int i3 = this.messageCount;
                                                StringBuilder append = new StringBuilder().append("message ").append(i2).append(" ");
                                                int i4 = this.messageCount;
                                                this.messageCount = i4 + MessageTableTest.GENERATION;
                                                return new TestMessageEntry(topicId2, MessageTableTest.GENERATION, currentTimeMillis, i3, null, Bytes.toBytes(append.append(i4).toString()));
                                            } catch (Exception e) {
                                                throw Throwables.propagate(e);
                                            }
                                        }
                                    });
                                    countDownLatch.countDown();
                                    if (messageTable != null) {
                                        if (0 != 0) {
                                            try {
                                                messageTable.close();
                                            } catch (Throwable th4) {
                                                th3.addSuppressed(th4);
                                            }
                                        } else {
                                            messageTable.close();
                                        }
                                    }
                                    return null;
                                } finally {
                                }
                            } catch (Exception e) {
                                MessageTableTest.LOG.error("Failed to store to MessageTable", e);
                                return null;
                            }
                        }
                    });
                } catch (Throwable th3) {
                    if (metadataTable != null) {
                        if (th != null) {
                            try {
                                metadataTable.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            metadataTable.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        listeningDecorator.shutdown();
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        for (int i3 = 0; i3 < 2; i3 += GENERATION) {
            TopicMetadata topicMetadata2 = new TopicMetadata(NamespaceId.DEFAULT.topic("testConcurrentWrites" + i3), DEFAULT_PROPERTY);
            MessageTable messageTable = getMessageTable();
            Throwable th5 = null;
            try {
                CloseableIterator fetch = messageTable.fetch(topicMetadata2, 0L, 10, (Transaction) null);
                Throwable th6 = null;
                try {
                    try {
                        ArrayList<MessageTable.Entry> newArrayList = Lists.newArrayList(fetch);
                        Assert.assertEquals(2L, newArrayList.size());
                        int i4 = 0;
                        for (MessageTable.Entry entry : newArrayList) {
                            StringBuilder append = new StringBuilder().append("message ").append(i3).append(" ");
                            int i5 = i4;
                            i4 += GENERATION;
                            Assert.assertEquals(append.append(i5).toString(), Bytes.toString(entry.getPayload()));
                        }
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        if (messageTable != null) {
                            if (0 != 0) {
                                try {
                                    messageTable.close();
                                } catch (Throwable th8) {
                                    th5.addSuppressed(th8);
                                }
                            } else {
                                messageTable.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th9) {
                    if (fetch != null) {
                        if (th6 != null) {
                            try {
                                fetch.close();
                            } catch (Throwable th10) {
                                th6.addSuppressed(th10);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    throw th9;
                }
            } catch (Throwable th11) {
                if (messageTable != null) {
                    if (0 != 0) {
                        try {
                            messageTable.close();
                        } catch (Throwable th12) {
                            th5.addSuppressed(th12);
                        }
                    } else {
                        messageTable.close();
                    }
                }
                throw th11;
            }
        }
    }

    private void checkPointerCount(CloseableIterator<MessageTable.Entry> closeableIterator, int i, Set<Long> set, int i2) {
        int i3 = 0;
        while (closeableIterator.hasNext()) {
            MessageTable.Entry entry = (MessageTable.Entry) closeableIterator.next();
            Assert.assertArrayEquals(Bytes.toBytes(i), entry.getPayload());
            if (entry.isPayloadReference() || entry.isTransactional()) {
                Assert.assertTrue(set.contains(Long.valueOf(entry.getTransactionWritePointer())));
            }
            i3 += GENERATION;
        }
        Assert.assertEquals(i2, i3);
    }

    private long populateList(List<MessageTable.Entry> list, List<Long> list2, Map<Long, Short> map, Map<Long, Short> map2) {
        long currentTimeMillis = System.currentTimeMillis();
        short s = 0;
        for (Long l : list2) {
            map.put(l, Short.valueOf(s));
            for (int i = 0; i < 50; i += GENERATION) {
                short s2 = (short) (s + GENERATION);
                list.add(new TestMessageEntry(T1, GENERATION, currentTimeMillis, s, l, Bytes.toBytes(123)));
                TopicId topicId = T2;
                s = (short) (s2 + GENERATION);
                list.add(new TestMessageEntry(topicId, GENERATION, currentTimeMillis, s2, l, Bytes.toBytes(321)));
            }
            map2.put(l, Short.valueOf((short) (s - GENERATION)));
        }
        return currentTimeMillis;
    }
}
