package co.cask.cdap.data2.dataset2.lib.partitioned;

import co.cask.cdap.api.Predicate;
import co.cask.cdap.api.dataset.lib.Partition;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetProperties;
import co.cask.cdap.api.dataset.lib.Partitioning;
import co.cask.cdap.api.dataset.lib.partitioned.ConcurrentPartitionConsumer;
import co.cask.cdap.api.dataset.lib.partitioned.ConsumablePartition;
import co.cask.cdap.api.dataset.lib.partitioned.ConsumerConfiguration;
import co.cask.cdap.api.dataset.lib.partitioned.ConsumerWorkingSet;
import co.cask.cdap.api.dataset.lib.partitioned.PartitionAcceptor;
import co.cask.cdap.api.dataset.lib.partitioned.PartitionConsumerResult;
import co.cask.cdap.api.dataset.lib.partitioned.ProcessState;
import co.cask.cdap.api.dataset.lib.partitioned.StatePersistor;
import co.cask.cdap.data2.dataset2.DatasetFrameworkTestUtil;
import co.cask.cdap.proto.Id;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.twill.filesystem.Location;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/partitioned/PartitionConsumerTest.class */
public class PartitionConsumerTest {

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();

    @ClassRule
    public static DatasetFrameworkTestUtil dsFrameworkUtil = new DatasetFrameworkTestUtil();
    private static final Partitioning PARTITIONING_1 = Partitioning.builder().addStringField("s").addIntField("i").addLongField("l").build();
    private static final Id.DatasetInstance pfsInstance = Id.DatasetInstance.from(DatasetFrameworkTestUtil.NAMESPACE_ID, "pfs");
    private static final Id.DatasetInstance pfsExternalInstance = Id.DatasetInstance.from(DatasetFrameworkTestUtil.NAMESPACE_ID, "ext");
    private static Location pfsBaseLocation;
    private int counter = 0;

    /* renamed from: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest$32, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/partitioned/PartitionConsumerTest$32.class */
    static /* synthetic */ class AnonymousClass32 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$dataset$lib$partitioned$PartitionAcceptor$Return = new int[PartitionAcceptor.Return.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$api$dataset$lib$partitioned$PartitionAcceptor$Return[PartitionAcceptor.Return.ACCEPT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$dataset$lib$partitioned$PartitionAcceptor$Return[PartitionAcceptor.Return.SKIP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$dataset$lib$partitioned$PartitionAcceptor$Return[PartitionAcceptor.Return.STOP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/partitioned/PartitionConsumerTest$CustomAcceptor.class */
    public static final class CustomAcceptor implements PartitionAcceptor {
        private final String allowedSField;
        private final Integer stopOnI;

        public CustomAcceptor(String str) {
            this(str, null);
        }

        public CustomAcceptor(String str, @Nullable Integer num) {
            this.allowedSField = str;
            this.stopOnI = num;
        }

        public PartitionAcceptor.Return accept(PartitionDetail partitionDetail) {
            if (this.allowedSField.equals((String) partitionDetail.getPartitionKey().getField("s"))) {
                return (this.stopOnI == null || !this.stopOnI.equals(Integer.valueOf(((Integer) partitionDetail.getPartitionKey().getField("i")).intValue()))) ? PartitionAcceptor.Return.ACCEPT : PartitionAcceptor.Return.STOP;
            }
            return PartitionAcceptor.Return.SKIP;
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/partitioned/PartitionConsumerTest$CustomConsumer.class */
    public static final class CustomConsumer extends ConcurrentPartitionConsumer {
        public CustomConsumer(PartitionedFileSet partitionedFileSet, StatePersistor statePersistor, ConsumerConfiguration consumerConfiguration) {
            super(partitionedFileSet, statePersistor, consumerConfiguration);
        }

        public PartitionConsumerResult doConsume(ConsumerWorkingSet consumerWorkingSet, PartitionAcceptor partitionAcceptor) {
            doExpiry(consumerWorkingSet);
            consumerWorkingSet.populate(getPartitionedFileSet(), getConfiguration());
            long currentTimeMillis = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList();
            List<ConsumablePartition> partitions = consumerWorkingSet.getPartitions();
            if (partitions.size() >= 1) {
                ConsumablePartition consumablePartition = (ConsumablePartition) partitions.get(0);
                if (isLastAttempt(consumablePartition)) {
                    consumablePartition.take();
                    consumablePartition.setTimestamp(currentTimeMillis);
                    arrayList.add(getPartitionedFileSet().getPartition(consumablePartition.getPartitionKey()));
                    return new PartitionConsumerResult(arrayList, removeDiscardedPartitions(consumerWorkingSet));
                }
            }
            for (ConsumablePartition consumablePartition2 : partitions) {
                if (ProcessState.AVAILABLE == consumablePartition2.getProcessState() && !isLastAttempt(consumablePartition2)) {
                    PartitionDetail partition = getPartitionedFileSet().getPartition(consumablePartition2.getPartitionKey());
                    switch (AnonymousClass32.$SwitchMap$co$cask$cdap$api$dataset$lib$partitioned$PartitionAcceptor$Return[partitionAcceptor.accept(partition).ordinal()]) {
                        case 1:
                            consumablePartition2.take();
                            consumablePartition2.setTimestamp(currentTimeMillis);
                            arrayList.add(partition);
                            break;
                    }
                }
            }
            return new PartitionConsumerResult(arrayList, removeDiscardedPartitions(consumerWorkingSet));
        }

        private boolean isLastAttempt(ConsumablePartition consumablePartition) {
            return consumablePartition.getNumFailures() == getConfiguration().getMaxRetries() - 1;
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/partitioned/PartitionConsumerTest$InMemoryStatePersistor.class */
    private static final class InMemoryStatePersistor implements StatePersistor {
        private byte[] state;

        private InMemoryStatePersistor() {
        }

        public void persistState(byte[] bArr) {
            this.state = bArr;
        }

        @Nullable
        public byte[] readState() {
            return this.state;
        }
    }

    @Before
    public void before() throws Exception {
        dsFrameworkUtil.createInstance("partitionedFileSet", pfsInstance, PartitionedFileSetProperties.builder().setPartitioning(PARTITIONING_1).setBasePath("testDir").build());
        pfsBaseLocation = dsFrameworkUtil.getInstance(pfsInstance).getEmbeddedFileSet().getBaseLocation();
        Assert.assertTrue(pfsBaseLocation.exists());
    }

    @After
    public void after() throws Exception {
        if (dsFrameworkUtil.getInstance(pfsInstance) != null) {
            dsFrameworkUtil.deleteInstance(pfsInstance);
        }
        if (dsFrameworkUtil.getInstance(pfsExternalInstance) != null) {
            dsFrameworkUtil.deleteInstance(pfsExternalInstance);
        }
        Assert.assertFalse(pfsBaseLocation.exists());
    }

    @Test
    public void testPartitionConsumer() throws Exception {
        TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        InMemoryTxSystemClient inMemoryTxSystemClient = new InMemoryTxSystemClient(dsFrameworkUtil.getTxManager());
        TransactionContext transactionContext = new TransactionContext(inMemoryTxSystemClient, new TransactionAware[]{transactionAware});
        transactionContext.start();
        PartitionKey generateUniqueKey = generateUniqueKey();
        transactionAware.getPartitionOutput(generateUniqueKey).addPartition();
        transactionContext.finish();
        TransactionContext transactionContext2 = new TransactionContext(inMemoryTxSystemClient, new TransactionAware[]{transactionAware2});
        transactionContext2.start();
        ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware2, new InMemoryStatePersistor());
        List partitions = concurrentPartitionConsumer.consumePartitions().getPartitions();
        Assert.assertEquals(1L, partitions.size());
        Assert.assertEquals(generateUniqueKey, ((PartitionDetail) partitions.get(0)).getPartitionKey());
        transactionContext2.finish();
        transactionContext.start();
        PartitionKey generateUniqueKey2 = generateUniqueKey();
        transactionAware.getPartitionOutput(generateUniqueKey2).addPartition();
        transactionContext2.start();
        Assert.assertTrue(concurrentPartitionConsumer.consumePartitions().getPartitions().isEmpty());
        transactionContext2.finish();
        transactionContext.finish();
        transactionContext2.start();
        List partitions2 = concurrentPartitionConsumer.consumePartitions().getPartitions();
        Assert.assertEquals(1L, partitions2.size());
        Assert.assertEquals(generateUniqueKey2, ((PartitionDetail) partitions2.get(0)).getPartitionKey());
        transactionContext2.finish();
    }

    @Test
    public void testSimplePartitionConsuming() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add(generateUniqueKey());
        }
        final HashSet hashSet2 = new HashSet();
        for (int i2 = 0; i2 < 15; i2++) {
            hashSet2.add(generateUniqueKey());
        }
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor());
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.1
            public void apply() throws Exception {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.2
            public void apply() throws Exception {
                Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(concurrentPartitionConsumer.consumePartitions().getPartitions()));
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.3
            public void apply() throws Exception {
                Iterator it = hashSet2.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.4
            public void apply() throws Exception {
                Assert.assertEquals(hashSet2, PartitionConsumerTest.this.toKeys(concurrentPartitionConsumer.consumePartitions().getPartitions()));
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.5
            public void apply() throws Exception {
                Assert.assertTrue(concurrentPartitionConsumer.consumePartitions().getPartitions().isEmpty());
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.6
            public void apply() throws Exception {
                List partitions = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor()).consumePartitions().getPartitions();
                HashSet hashSet3 = new HashSet();
                hashSet3.addAll(hashSet);
                hashSet3.addAll(hashSet2);
                Assert.assertEquals(hashSet3, PartitionConsumerTest.this.toKeys(partitions));
            }
        });
    }

    @Test
    public void testConsumeAfterDelete() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final HashSet hashSet = new HashSet();
        for (int i = 0; i < 3; i++) {
            hashSet.add(generateUniqueKey());
        }
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor(), ConsumerConfiguration.builder().setMaxWorkingSetSize(100).build());
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.7
            public void apply() throws Exception {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.8
            public void apply() throws Exception {
                for (int i2 = 0; i2 < 2; i2++) {
                    transactionAware.getPartitionOutput(PartitionConsumerTest.this.generateUniqueKey()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.9
            public void apply() throws Exception {
                Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(concurrentPartitionConsumer.consumePartitions(3).getPartitions()));
            }
        });
        final HashSet hashSet2 = new HashSet();
        for (int i2 = 0; i2 < 5; i2++) {
            hashSet2.add(generateUniqueKey());
        }
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.10
            public void apply() throws Exception {
                Iterator it = transactionAware.getPartitions(PartitionFilter.ALWAYS_MATCH).iterator();
                while (it.hasNext()) {
                    transactionAware.dropPartition(((PartitionDetail) it.next()).getPartitionKey());
                }
                Iterator it2 = hashSet2.iterator();
                while (it2.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it2.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.11
            public void apply() throws Exception {
                Assert.assertEquals(hashSet2, PartitionConsumerTest.this.toKeys(concurrentPartitionConsumer.consumePartitions().getPartitions()));
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.12
            public void apply() throws Exception {
                Assert.assertTrue(concurrentPartitionConsumer.consumePartitions().getPartitions().isEmpty());
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.13
            public void apply() throws Exception {
                Assert.assertEquals(hashSet2, PartitionConsumerTest.this.toKeys(new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor()).consumePartitions().getPartitions()));
            }
        });
    }

    @Test
    public void testPartitionConsumingWithFilterAndLimit() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final HashSet<PartitionKey> hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add(generateUniqueKey());
        }
        final HashSet hashSet2 = new HashSet();
        for (int i2 = 0; i2 < 15; i2++) {
            hashSet2.add(generateUniqueKey());
        }
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor());
        for (final PartitionKey partitionKey : hashSet) {
            dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.14
                public void apply() throws Exception {
                    transactionAware.getPartitionOutput(partitionKey).addPartition();
                }
            });
        }
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.15
            public void apply() throws Exception {
                ArrayList arrayList = new ArrayList();
                Iterables.addAll(arrayList, concurrentPartitionConsumer.consumePartitions(1).getPartitions());
                Assert.assertEquals(1L, arrayList.size());
                Iterables.addAll(arrayList, concurrentPartitionConsumer.consumePartitions(5).getPartitions());
                Assert.assertEquals(6L, arrayList.size());
                Iterables.addAll(arrayList, concurrentPartitionConsumer.consumePartitions(5).getPartitions());
                Assert.assertEquals(10L, arrayList.size());
                Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(arrayList));
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.16
            public void apply() throws Exception {
                Iterator it = hashSet2.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.17
            public void apply() throws Exception {
                Assert.assertEquals(hashSet2, PartitionConsumerTest.this.toKeys(concurrentPartitionConsumer.consumePartitions().getPartitions()));
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.18
            public void apply() throws Exception {
                Assert.assertTrue(concurrentPartitionConsumer.consumePartitions().getPartitions().isEmpty());
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.19
            public void apply() throws Exception {
                final PartitionFilter build = PartitionFilter.builder().addRangeCondition("i", 1, 7).build();
                ConcurrentPartitionConsumer concurrentPartitionConsumer2 = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor(), ConsumerConfiguration.builder().setPartitionPredicate(new Predicate<PartitionDetail>() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.19.1
                    public boolean apply(PartitionDetail partitionDetail) {
                        return build.match(partitionDetail.getPartitionKey());
                    }
                }).build());
                ArrayList arrayList = new ArrayList();
                Iterables.addAll(arrayList, concurrentPartitionConsumer2.consumePartitions(4).getPartitions());
                Assert.assertEquals(4L, arrayList.size());
                Iterables.addAll(arrayList, concurrentPartitionConsumer2.consumePartitions(3).getPartitions());
                Assert.assertEquals(6L, arrayList.size());
                HashSet hashSet3 = new HashSet();
                for (int i3 = 1; i3 < 7; i3++) {
                    hashSet3.add(Integer.valueOf(i3));
                }
                HashSet hashSet4 = new HashSet();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    hashSet4.add((Integer) ((Partition) it.next()).getPartitionKey().getField("i"));
                }
                Assert.assertEquals(hashSet3, hashSet4);
            }
        });
    }

    @Test
    public void testPartitionPutback() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add(generateUniqueKey());
        }
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor(), ConsumerConfiguration.builder().setMaxRetries(1).build());
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.20
            public void apply() throws Exception {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.21
            public void apply() throws Exception {
                List partitions = concurrentPartitionConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(partitions));
                Assert.assertTrue(concurrentPartitionConsumer.consumePartitions().getPartitions().isEmpty());
                for (int i2 = 0; i2 < 5; i2++) {
                    concurrentPartitionConsumer.untake(partitions);
                    partitions = concurrentPartitionConsumer.consumePartitions().getPartitions();
                    Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(partitions));
                    Assert.assertEquals(0L, r0.getFailedPartitions().size());
                }
                Assert.assertTrue(concurrentPartitionConsumer.consumePartitions().getPartitions().isEmpty());
                Partition partition = (Partition) partitions.get(0);
                concurrentPartitionConsumer.untakeWithKeys(ImmutableList.of(partition.getPartitionKey()));
                List partitions2 = concurrentPartitionConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(1L, partitions2.size());
                Assert.assertEquals(partition, partitions2.get(0));
            }
        });
    }

    @Test
    public void testPartitionConsumingWithPartitionAcceptor() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add(PartitionKey.builder().addIntField("i", i).addLongField("l", 17L).addStringField("s", "partitionKeys1").build());
        }
        final HashSet hashSet2 = new HashSet();
        for (int i2 = 0; i2 < 15; i2++) {
            hashSet2.add(PartitionKey.builder().addIntField("i", i2).addLongField("l", 17L).addStringField("s", "partitionKeys2").build());
        }
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.22
            public void apply() throws Exception {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
                Iterator it2 = hashSet2.iterator();
                while (it2.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it2.next()).addPartition();
                }
            }
        });
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor());
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.23
            public void apply() throws Exception {
                ArrayList arrayList = new ArrayList();
                Iterables.addAll(arrayList, concurrentPartitionConsumer.consumePartitions(new CustomAcceptor("partitionKeys1")).getPartitions());
                Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(arrayList));
                arrayList.clear();
                Iterables.addAll(arrayList, concurrentPartitionConsumer.consumePartitions(new CustomAcceptor("partitionKeys2", 8)).getPartitions());
                Assert.assertEquals(8L, arrayList.size());
                Iterables.addAll(arrayList, concurrentPartitionConsumer.consumePartitions().getPartitions());
                Assert.assertEquals(hashSet2, PartitionConsumerTest.this.toKeys(arrayList));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<PartitionKey> toKeys(List<? extends Partition> list) {
        HashSet hashSet = new HashSet(list.size());
        Iterator<? extends Partition> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getPartitionKey());
        }
        return hashSet;
    }

    @Test
    public void testSimpleConcurrency() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add(generateUniqueKey());
        }
        InMemoryStatePersistor inMemoryStatePersistor = new InMemoryStatePersistor();
        ConsumerConfiguration build = ConsumerConfiguration.builder().setMaxRetries(3).build();
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, inMemoryStatePersistor, build);
        final ConcurrentPartitionConsumer concurrentPartitionConsumer2 = new ConcurrentPartitionConsumer(transactionAware, inMemoryStatePersistor, build);
        final ConcurrentPartitionConsumer concurrentPartitionConsumer3 = new ConcurrentPartitionConsumer(transactionAware, inMemoryStatePersistor, build);
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.24
            public void apply() throws Exception {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.25
            public void apply() throws Exception {
                List partitions = concurrentPartitionConsumer.consumePartitions(1).getPartitions();
                Assert.assertEquals(1L, partitions.size());
                List partitions2 = concurrentPartitionConsumer2.consumePartitions(10).getPartitions();
                Assert.assertEquals(9L, partitions2.size());
                Assert.assertEquals(0L, concurrentPartitionConsumer3.consumePartitions().getPartitions().size());
                concurrentPartitionConsumer.onFinish(partitions, false);
                partitions.clear();
                List partitions3 = concurrentPartitionConsumer3.consumePartitions(2).getPartitions();
                Assert.assertEquals(1L, partitions3.size());
                concurrentPartitionConsumer3.onFinish(partitions3, true);
                concurrentPartitionConsumer2.onFinishWithKeys(Lists.transform(partitions2, new Function<PartitionDetail, PartitionKey>() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.25.1
                    public PartitionKey apply(PartitionDetail partitionDetail) {
                        return partitionDetail.getPartitionKey();
                    }
                }), true);
                Assert.assertEquals(0L, concurrentPartitionConsumer3.consumePartitions().getPartitions().size());
                ArrayList arrayList = new ArrayList();
                arrayList.addAll(partitions);
                arrayList.addAll(partitions2);
                arrayList.addAll(partitions3);
                Assert.assertEquals(hashSet, PartitionConsumerTest.this.toKeys(arrayList));
            }
        });
    }

    @Test
    public void testOnFinishWithInvalidPartition() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor(), ConsumerConfiguration.builder().setMaxRetries(3).build());
        final PartitionKey generateUniqueKey = generateUniqueKey();
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.26
            public void apply() throws Exception {
                transactionAware.getPartitionOutput(generateUniqueKey).addPartition();
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.27
            public void apply() throws Exception {
                List partitions = concurrentPartitionConsumer.consumePartitions(1).getPartitions();
                Assert.assertEquals(1L, partitions.size());
                concurrentPartitionConsumer.onFinish(partitions, false);
                try {
                    concurrentPartitionConsumer.onFinish(partitions, false);
                    Assert.fail("Expected not to be able to abort a partition that is not IN_PROGRESS");
                } catch (IllegalStateException e) {
                }
                List partitions2 = concurrentPartitionConsumer.consumePartitions(1).getPartitions();
                Assert.assertEquals(1L, partitions2.size());
                concurrentPartitionConsumer.onFinish(partitions2, true);
                try {
                    concurrentPartitionConsumer.onFinish(partitions2, true);
                    Assert.fail("Expected not to be able to call onFinish on a partition is not IN_PROGRESS");
                } catch (IllegalArgumentException e2) {
                }
            }
        });
    }

    @Test
    public void testNumRetries() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final ConcurrentPartitionConsumer concurrentPartitionConsumer = new ConcurrentPartitionConsumer(transactionAware, new InMemoryStatePersistor(), ConsumerConfiguration.builder().setMaxRetries(1).build());
        final PartitionKey generateUniqueKey = generateUniqueKey();
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.28
            public void apply() throws Exception {
                transactionAware.getPartitionOutput(generateUniqueKey).addPartition();
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.29
            public void apply() throws Exception {
                for (int i = 0; i < 2; i++) {
                    List partitions = concurrentPartitionConsumer.consumePartitions(1).getPartitions();
                    Assert.assertEquals(1L, partitions.size());
                    Assert.assertEquals(generateUniqueKey, ((PartitionDetail) partitions.get(0)).getPartitionKey());
                    concurrentPartitionConsumer.onFinish(partitions, false);
                }
                PartitionConsumerResult consumePartitions = concurrentPartitionConsumer.consumePartitions(1);
                Assert.assertEquals(0L, consumePartitions.getPartitions().size());
                Assert.assertEquals(1L, consumePartitions.getFailedPartitions().size());
                Assert.assertEquals(generateUniqueKey, ((PartitionDetail) consumePartitions.getFailedPartitions().get(0)).getPartitionKey());
            }
        });
    }

    @Test
    public void testCustomOperations() throws Exception {
        final TransactionAware transactionAware = (PartitionedFileSet) dsFrameworkUtil.getInstance(pfsInstance);
        TransactionAware transactionAware2 = transactionAware;
        final CustomConsumer customConsumer = new CustomConsumer(transactionAware, new InMemoryStatePersistor(), ConsumerConfiguration.builder().setMaxRetries(3).build());
        final ArrayList arrayList = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            arrayList.add(generateUniqueKey());
        }
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.30
            public void apply() throws Exception {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    transactionAware.getPartitionOutput((PartitionKey) it.next()).addPartition();
                }
            }
        });
        dsFrameworkUtil.newInMemoryTransactionExecutor(transactionAware2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.partitioned.PartitionConsumerTest.31
            public void apply() throws Exception {
                List partitions = customConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(3L, partitions.size());
                customConsumer.onFinish(partitions, false);
                List partitions2 = customConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(3L, partitions2.size());
                customConsumer.onFinish(partitions2, false);
                List partitions3 = customConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(1L, partitions3.size());
                customConsumer.onFinish(partitions3, true);
                List partitions4 = customConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(1L, partitions4.size());
                customConsumer.onFinish(partitions4, true);
                List partitions5 = customConsumer.consumePartitions().getPartitions();
                Assert.assertEquals(1L, partitions5.size());
                customConsumer.onFinish(partitions5, true);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PartitionKey generateUniqueKey() {
        PartitionKey.Builder builder = PartitionKey.builder();
        int i = this.counter;
        this.counter = i + 1;
        return builder.addIntField("i", i).addLongField("l", 17L).addStringField("s", UUID.randomUUID().toString()).build();
    }
}
