package org.apache.ignite.internal.processors.datastreamer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.IgniteReflectionFactory;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheSeparateDirectoryTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.class */
public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
    private static ConcurrentHashMap<Object, Object> storeMap;
    private CacheMode mode = CacheMode.PARTITIONED;
    private boolean nearEnabled = true;
    private boolean useCache;
    private TestStore store;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest$StringStringStreamReceiver.class */
    private static class StringStringStreamReceiver implements StreamReceiver<String, String> {
        private StringStringStreamReceiver() {
        }

        public void receive(IgniteCache<String, String> igniteCache, Collection<Map.Entry<String, String>> collection) throws IgniteException {
            igniteCache.put(IgniteMarshallerCacheSeparateDirectoryTest.KEY, Thread.currentThread().getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest$TestDataReceiver.class */
    public static class TestDataReceiver implements StreamReceiver<String, TestObject> {
        private TestDataReceiver() {
        }

        public void receive(IgniteCache<String, TestObject> igniteCache, Collection<Map.Entry<String, TestObject>> collection) {
            for (Map.Entry<String, TestObject> entry : collection) {
                DataStreamProcessorSelfTest.assertTrue(entry.getKey() instanceof String);
                DataStreamProcessorSelfTest.assertTrue(entry.getValue() instanceof TestObject);
                igniteCache.put(entry.getKey(), new TestObject(entry.getValue().val + 1));
            }
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest$TestObject.class */
    public static class TestObject {
        public final int val;

        public TestObject(int i) {
            this.val = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.val == ((TestObject) obj).val;
        }

        public int hashCode() {
            return this.val;
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest$TestStore.class */
    public static class TestStore extends CacheStoreAdapter<Object, Object> {
        @Nullable
        public Object load(Object obj) {
            return DataStreamProcessorSelfTest.storeMap.get(obj);
        }

        public void write(Cache.Entry<?, ?> entry) {
            DataStreamProcessorSelfTest.storeMap.put(entry.getKey(), entry.getValue());
        }

        public void delete(Object obj) {
            DataStreamProcessorSelfTest.storeMap.remove(obj);
        }
    }

    @Override // org.apache.ignite.testframework.junits.common.GridCommonAbstractTest, org.apache.ignite.testframework.junits.GridAbstractTest
    protected void beforeTest() throws Exception {
        super.beforeTest();
        if (persistenceEnabled()) {
            cleanPersistenceDir();
        }
    }

    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        super.afterTest();
        this.useCache = false;
    }

    public boolean persistenceEnabled() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setPeerClassLoadingEnabled(false);
        configuration.setIncludeProperties(new String[0]);
        if (this.useCache) {
            CacheConfiguration defaultCacheConfiguration = defaultCacheConfiguration();
            defaultCacheConfiguration.setCacheMode(this.mode);
            defaultCacheConfiguration.setAtomicityMode(getCacheAtomicityMode());
            if (this.nearEnabled) {
                defaultCacheConfiguration.setNearConfiguration(new NearCacheConfiguration());
            }
            defaultCacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
            if (this.store != null) {
                defaultCacheConfiguration.setCacheStoreFactory(new IgniteReflectionFactory(TestStore.class));
                defaultCacheConfiguration.setReadThrough(true);
                defaultCacheConfiguration.setWriteThrough(true);
            }
            configuration.setCacheConfiguration(new CacheConfiguration[]{defaultCacheConfiguration});
            if (persistenceEnabled()) {
                configuration.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)).setWalMode(WALMode.LOG_ONLY));
            }
        } else {
            configuration.setCacheConfiguration(new CacheConfiguration[0]);
            configuration.setClientMode(true);
        }
        configuration.setIncludeEventTypes(EventType.EVTS_ALL);
        return configuration;
    }

    protected CacheAtomicityMode getCacheAtomicityMode() {
        return CacheAtomicityMode.TRANSACTIONAL;
    }

    protected boolean customKeepBinary() {
        return false;
    }

    @Test
    public void testPartitioned() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
        this.mode = CacheMode.PARTITIONED;
        checkDataStreamer();
    }

    @Test
    public void testColocated() throws Exception {
        this.mode = CacheMode.PARTITIONED;
        this.nearEnabled = false;
        checkDataStreamer();
    }

    @Test
    public void testReplicated() throws Exception {
        this.mode = CacheMode.REPLICATED;
        checkDataStreamer();
    }

    @Test
    public void testLocal() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.LOCAL_CACHE);
        this.mode = CacheMode.LOCAL;
        try {
            checkDataStreamer();
            if ($assertionsDisabled) {
            } else {
                throw new AssertionError();
            }
        } catch (CacheException e) {
            info("Caught expected exception: " + e);
        }
    }

    private void checkDataStreamer() throws Exception {
        try {
            this.useCache = true;
            IgniteEx startGrid = startGrid(2);
            startGrid(3);
            this.useCache = false;
            IgniteEx startGrid2 = startGrid(1);
            afterGridStarted();
            final IgniteDataStreamer dataStreamer = startGrid2.dataStreamer("default");
            dataStreamer.receiver(DataStreamerCacheUpdaters.batchedSorted());
            final AtomicInteger atomicInteger = new AtomicInteger();
            final CountDownLatch countDownLatch = new CountDownLatch(10);
            IgniteInternalFuture<?> multithreadedAsync = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.1
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    ArrayList arrayList = new ArrayList(400);
                    for (int i = 0; i < 400; i++) {
                        int andIncrement = atomicInteger.getAndIncrement();
                        arrayList.add(dataStreamer.addData(Integer.valueOf(andIncrement), Integer.valueOf(andIncrement)));
                    }
                    countDownLatch.countDown();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((IgniteFuture) it.next()).get();
                    }
                    return null;
                }
            }, 10);
            countDownLatch.await();
            stopGrid(getTestIgniteInstanceName(1), false);
            multithreadedAsync.get();
            assertEquals(4000, grid(2).cache("default").localSize(new CachePeekMode[]{CachePeekMode.PRIMARY}) + grid(3).cache("default").localSize(new CachePeekMode[]{CachePeekMode.PRIMARY}));
            final IgniteDataStreamer dataStreamer2 = startGrid.dataStreamer("default");
            dataStreamer2.receiver(DataStreamerCacheUpdaters.batchedSorted());
            final CountDownLatch countDownLatch2 = new CountDownLatch(10);
            IgniteInternalFuture<?> multithreadedAsync2 = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.2
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    ArrayList arrayList = new ArrayList(400);
                    for (int i = 0; i < 400; i++) {
                        arrayList.add(dataStreamer2.removeData(Integer.valueOf(atomicInteger.decrementAndGet())));
                    }
                    countDownLatch2.countDown();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((IgniteFuture) it.next()).get();
                    }
                    return null;
                }
            }, 10);
            countDownLatch2.await();
            dataStreamer2.close(false);
            multithreadedAsync2.get();
            int localSize = grid(2).cache("default").localSize(new CachePeekMode[]{CachePeekMode.PRIMARY});
            int localSize2 = grid(3).cache("default").localSize(new CachePeekMode[]{CachePeekMode.PRIMARY});
            if ($assertionsDisabled || (localSize == 0 && localSize2 == 0)) {
            } else {
                throw new AssertionError("Incorrect entries count [s2=" + localSize + ", s3=" + localSize2 + ']');
            }
        } finally {
            stopAllGrids();
        }
    }

    @Test
    public void testPartitionedIsolated() throws Exception {
        this.mode = CacheMode.PARTITIONED;
        checkIsolatedDataStreamer();
    }

    @Test
    public void testReplicatedIsolated() throws Exception {
        this.mode = CacheMode.REPLICATED;
        checkIsolatedDataStreamer();
    }

    /* JADX WARN: Finally extract failed */
    private void checkIsolatedDataStreamer() throws Exception {
        try {
            this.useCache = true;
            Ignite startGridsMultiThreaded = startGridsMultiThreaded(3);
            afterGridStarted();
            IgniteCache cache = grid(0).cache("default");
            for (int i = 0; i < 100; i++) {
                cache.put(Integer.valueOf(i), -1);
            }
            final IgniteDataStreamer dataStreamer = startGridsMultiThreaded.dataStreamer("default");
            Throwable th = null;
            try {
                try {
                    final AtomicInteger atomicInteger = new AtomicInteger();
                    multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.3
                        @Override // java.util.concurrent.Callable
                        public Object call() throws Exception {
                            for (int i2 = 0; i2 < 40000; i2++) {
                                int andIncrement = atomicInteger.getAndIncrement();
                                dataStreamer.addData(Integer.valueOf(andIncrement), Integer.valueOf(andIncrement));
                            }
                            return null;
                        }
                    }, 10).get();
                    if (dataStreamer != null) {
                        if (0 != 0) {
                            try {
                                dataStreamer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataStreamer.close();
                        }
                    }
                    for (int i2 = 0; i2 < 3; i2++) {
                        ClusterNode localNode = grid(i2).localNode();
                        GridCacheAdapter internalCache = grid(i2).internalCache("default");
                        if (internalCache.isNear()) {
                            internalCache = ((GridNearCacheAdapter) internalCache).dht();
                        }
                        Affinity affinity = internalCache.affinity();
                        int i3 = 0;
                        while (i3 < 400000) {
                            if (affinity.isPrimary(localNode, Integer.valueOf(i3)) || affinity.isBackup(localNode, Integer.valueOf(i3))) {
                                GridCacheEntryEx entryEx = internalCache.entryEx(Integer.valueOf(i3));
                                while (true) {
                                    try {
                                        entryEx.lockEntry();
                                        if (!entryEx.obsolete()) {
                                            break;
                                        }
                                        entryEx.unlockEntry();
                                        entryEx = internalCache.entryEx(Integer.valueOf(i3));
                                    } catch (Throwable th3) {
                                        entryEx.unlockEntry();
                                        throw th3;
                                    }
                                }
                                entryEx.unswap();
                                assertEquals(new Integer(i3 < 100 ? -1 : i3), CU.value(entryEx.rawGet(), internalCache.context(), false));
                                entryEx.unlockEntry();
                            }
                            i3++;
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            stopAllGrids();
        }
    }

    @Test
    public void testPrimitiveArrays() throws Exception {
        try {
            this.useCache = true;
            this.mode = CacheMode.PARTITIONED;
            IgniteEx startGrid = startGrid(1);
            startGrid(2);
            afterGridStarted();
            List asList = Arrays.asList(new byte[]{1}, new boolean[]{true, false}, new char[]{2, 3}, new short[]{3, 4}, new int[]{4, 5}, new long[]{5, 6}, new float[]{6.0f, 7.0f}, new double[]{7.0d, 8.0d});
            IgniteDataStreamer dataStreamer = startGrid.dataStreamer("default");
            int size = asList.size();
            for (int i = 0; i < 1000; i++) {
                Object obj = asList.get(i % size);
                dataStreamer.addData(Integer.valueOf(i), obj);
                dataStreamer.addData(Integer.valueOf(i), fixedClosure(obj));
            }
            dataStreamer.close(false);
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    @Test
    public void testReplicatedMultiThreaded() throws Exception {
        this.mode = CacheMode.REPLICATED;
        checkLoaderMultithreaded(1, 2);
    }

    @Test
    public void testPartitionedMultiThreaded() throws Exception {
        this.mode = CacheMode.PARTITIONED;
        checkLoaderMultithreaded(1, 3);
    }

    protected void checkLoaderMultithreaded(int i, int i2) throws Exception {
        try {
            this.useCache = true;
            startGridsMultiThreaded(i2);
            this.useCache = false;
            startGridsMultiThreaded(i2, i);
            IgniteEx grid = grid((i2 + i) - 1);
            afterGridStarted();
            final IgniteDataStreamer dataStreamer = grid.dataStreamer("default");
            dataStreamer.receiver(DataStreamerCacheUpdaters.individual());
            dataStreamer.perNodeBufferSize(2);
            dataStreamer.perThreadBufferSize(1);
            final AtomicInteger atomicInteger = new AtomicInteger();
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            try {
                IgniteInternalFuture<?> multithreadedAsync = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.4
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        ArrayList arrayList = new ArrayList();
                        while (true) {
                            if (atomicBoolean.get()) {
                                break;
                            }
                            int andIncrement = atomicInteger.getAndIncrement();
                            if (andIncrement >= 50000) {
                                DataStreamProcessorSelfTest.this.info(">>> Stopping producer thread since maximum count of puts reached.");
                                break;
                            }
                            arrayList.add(dataStreamer.addData(Integer.valueOf(andIncrement), Integer.valueOf(andIncrement)));
                        }
                        dataStreamer.flush();
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            ((IgniteFuture) it.next()).get();
                        }
                        return null;
                    }
                }, 5, "producer");
                IgniteInternalFuture<?> multithreadedAsync2 = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.5
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        while (!atomicBoolean.get()) {
                            dataStreamer.flush();
                            U.sleep(100L);
                        }
                        return null;
                    }
                }, 1, "flusher");
                final int i3 = i2 + i + 1;
                IgniteInternalFuture<?> multithreadedAsync3 = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.6
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        for (int i4 = 0; i4 < 5; i4++) {
                            try {
                                UUID id = DataStreamProcessorSelfTest.this.startGrid(i3).cluster().localNode().id();
                                DataStreamProcessorSelfTest.this.info(">>>>>>> Started node: " + id);
                                U.sleep(1000L);
                                DataStreamProcessorSelfTest.this.stopGrid(DataStreamProcessorSelfTest.this.getTestIgniteInstanceName(i3), true);
                                DataStreamProcessorSelfTest.this.info(">>>>>>> Stopped node: " + id);
                            } finally {
                                atomicBoolean.set(true);
                                DataStreamProcessorSelfTest.this.info("Start stop thread finished.");
                            }
                        }
                        return null;
                    }
                }, 1, "start-stop-thread");
                multithreadedAsync.get();
                multithreadedAsync2.get();
                multithreadedAsync3.get();
                dataStreamer.close(false);
            } catch (Throwable th) {
                dataStreamer.close(false);
                throw th;
            }
        } finally {
            stopAllGrids();
        }
    }

    @Test
    public void testLoaderApi() throws Exception {
        this.useCache = true;
        try {
            IgniteEx startGrid = startGrid(1);
            afterGridStarted();
            IgniteDataStreamer dataStreamer = startGrid.dataStreamer("default");
            dataStreamer.close(false);
            try {
                dataStreamer.addData(0, 0);
            } catch (IllegalStateException e) {
                info("Caught expected exception: " + e);
            }
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !dataStreamer.future().isDone()) {
                throw new AssertionError();
            }
            dataStreamer.future().get();
            try {
                dataStreamer = startGrid.dataStreamer("UNKNOWN_CACHE");
            } catch (IllegalStateException e2) {
                info("Caught expected exception: " + e2);
            }
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            dataStreamer.close(true);
            if (!$assertionsDisabled && !dataStreamer.future().isDone()) {
                throw new AssertionError();
            }
            dataStreamer.future().get();
            IgniteDataStreamer dataStreamer2 = startGrid.dataStreamer("default");
            dataStreamer2.future().cancel();
            try {
                dataStreamer2.addData(0, 0);
            } catch (IllegalStateException e3) {
                info("Caught expected exception: " + e3);
            }
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !dataStreamer2.future().isDone()) {
                throw new AssertionError();
            }
            try {
                dataStreamer2.future().get();
            } catch (IgniteFutureCancelledException e4) {
                info("Caught expected exception: " + e4);
            }
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            IgniteDataStreamer dataStreamer3 = startGrid.dataStreamer("default");
            stopGrid(getTestIgniteInstanceName(1), false);
            try {
                dataStreamer3.addData(0, 0);
            } catch (IllegalStateException e5) {
                info("Caught expected exception: " + e5);
            }
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !dataStreamer3.future().isDone()) {
                throw new AssertionError();
            }
            dataStreamer3.future().get();
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    private static Callable<Integer> callable(@Nullable final Integer num) {
        return new Callable<Integer>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return num;
            }
        };
    }

    private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer num) {
        return new IgniteClosure<Integer, Integer>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.8
            public Integer apply(Integer num2) {
                return Integer.valueOf(num2 == null ? num.intValue() : num2.intValue() + num.intValue());
            }
        };
    }

    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T t) {
        return new IgniteClosure<T, T>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.9
            static final /* synthetic */ boolean $assertionsDisabled;

            public T apply(T t2) {
                if ($assertionsDisabled || t2 == null || t == null || t2.getClass() == t.getClass()) {
                    return (T) t;
                }
                throw new AssertionError("Expects the same types [e=" + t2 + ", obj=" + t + ']');
            }

            static {
                $assertionsDisabled = !DataStreamProcessorSelfTest.class.desiredAssertionStatus();
            }
        };
    }

    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T t) {
        return new IgniteClosure<T, T>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.10
            public T apply(T t2) {
                if (t == null) {
                    if (t2 == null) {
                        return null;
                    }
                } else if (t.equals(t2)) {
                    return null;
                }
                throw new AssertionError("Unexpected value [exp=" + t + ", act=" + t2 + ']');
            }
        };
    }

    @Test
    public void testFlush() throws Exception {
        this.mode = CacheMode.PARTITIONED;
        this.useCache = true;
        try {
            IgniteEx startGrid = startGrid();
            afterGridStarted();
            final IgniteCache cache = startGrid.cache("default");
            final IgniteDataStreamer dataStreamer = startGrid.dataStreamer("default");
            dataStreamer.perNodeBufferSize(10);
            for (int i = 0; i < 9; i++) {
                dataStreamer.addData(Integer.valueOf(i), Integer.valueOf(i));
            }
            assertTrue(cache.localSize(new CachePeekMode[0]) == 0);
            multithreaded(new Callable<Void>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.11
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    dataStreamer.flush();
                    DataStreamProcessorSelfTest.assertEquals(9, cache.size(new CachePeekMode[0]));
                    return null;
                }
            }, 5, "flush-checker");
            dataStreamer.addData(100, 100);
            dataStreamer.flush();
            assertEquals(10, cache.size(new CachePeekMode[0]));
            dataStreamer.addData(Integer.valueOf(GridTestMessage.DIRECT_TYPE), Integer.valueOf(GridTestMessage.DIRECT_TYPE));
            dataStreamer.close(false);
            dataStreamer.future().get();
            assertEquals(11, cache.size(new CachePeekMode[0]));
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    @Test
    public void testTryFlush() throws Exception {
        this.mode = CacheMode.PARTITIONED;
        this.useCache = true;
        try {
            IgniteEx startGrid = startGrid();
            afterGridStarted();
            IgniteCache cache = startGrid.cache("default");
            IgniteDataStreamer dataStreamer = startGrid.dataStreamer("default");
            dataStreamer.perNodeBufferSize(10);
            for (int i = 0; i < 9; i++) {
                dataStreamer.addData(Integer.valueOf(i), Integer.valueOf(i));
            }
            assertTrue(cache.localSize(new CachePeekMode[0]) == 0);
            dataStreamer.tryFlush();
            Thread.sleep(100L);
            assertEquals(9, cache.size(new CachePeekMode[0]));
            dataStreamer.close(false);
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    @Test
    public void testFlushTimeout() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);
        this.mode = CacheMode.PARTITIONED;
        this.useCache = true;
        try {
            IgniteEx startGrid = startGrid();
            afterGridStarted();
            final CountDownLatch countDownLatch = new CountDownLatch(9);
            startGrid.events().localListen(new IgnitePredicate<Event>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.12
                public boolean apply(Event event) {
                    countDownLatch.countDown();
                    return true;
                }
            }, new int[]{63});
            IgniteCache cache = startGrid.cache("default");
            assertTrue(cache.localSize(new CachePeekMode[0]) == 0);
            IgniteDataStreamer dataStreamer = startGrid.dataStreamer("default");
            dataStreamer.perNodeBufferSize(10);
            dataStreamer.autoFlushFrequency(3000L);
            dataStreamer.allowOverwrite(true);
            for (int i = 0; i < 9; i++) {
                dataStreamer.addData(Integer.valueOf(i), Integer.valueOf(i));
            }
            assertTrue(cache.localSize(new CachePeekMode[0]) == 0);
            assertFalse(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
            assertTrue(cache.localSize(new CachePeekMode[0]) == 0);
            assertTrue(countDownLatch.await(3000L, TimeUnit.MILLISECONDS));
            assertEquals(9, cache.size(new CachePeekMode[0]));
            dataStreamer.close(false);
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testUpdateStore() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
        storeMap = new ConcurrentHashMap<>();
        try {
            this.store = new TestStore();
            this.useCache = true;
            Ignite startGridsMultiThreaded = startGridsMultiThreaded(3);
            afterGridStarted();
            for (int i = 0; i < 1000; i++) {
                storeMap.put(Integer.valueOf(i), Integer.valueOf(i));
            }
            IgniteDataStreamer dataStreamer = startGridsMultiThreaded.dataStreamer("default");
            Throwable th = null;
            try {
                dataStreamer.allowOverwrite(true);
                assertFalse(dataStreamer.skipStore());
                for (int i2 = 0; i2 < 1000; i2++) {
                    dataStreamer.removeData(Integer.valueOf(i2));
                }
                for (int i3 = 1000; i3 < 2000; i3++) {
                    dataStreamer.addData(Integer.valueOf(i3), Integer.valueOf(i3));
                }
                if (dataStreamer != null) {
                    if (0 != 0) {
                        try {
                            dataStreamer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        dataStreamer.close();
                    }
                }
                for (int i4 = 0; i4 < 1000; i4++) {
                    assertNull(storeMap.get(Integer.valueOf(i4)));
                }
                for (int i5 = 1000; i5 < 2000; i5++) {
                    assertEquals(Integer.valueOf(i5), storeMap.get(Integer.valueOf(i5)));
                }
                IgniteDataStreamer dataStreamer2 = startGridsMultiThreaded.dataStreamer("default");
                Throwable th3 = null;
                try {
                    try {
                        dataStreamer2.allowOverwrite(true);
                        dataStreamer2.skipStore(true);
                        for (int i6 = 0; i6 < 1000; i6++) {
                            dataStreamer2.addData(Integer.valueOf(i6), Integer.valueOf(i6));
                        }
                        for (int i7 = 1000; i7 < 2000; i7++) {
                            dataStreamer2.removeData(Integer.valueOf(i7));
                        }
                        if (dataStreamer2 != null) {
                            if (0 != 0) {
                                try {
                                    dataStreamer2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                dataStreamer2.close();
                            }
                        }
                        IgniteCache cache = startGridsMultiThreaded.cache("default");
                        for (int i8 = 0; i8 < 1000; i8++) {
                            assertNull(storeMap.get(Integer.valueOf(i8)));
                            assertEquals(Integer.valueOf(i8), cache.get(Integer.valueOf(i8)));
                        }
                        for (int i9 = 1000; i9 < 2000; i9++) {
                            assertEquals(Integer.valueOf(i9), storeMap.get(Integer.valueOf(i9)));
                            assertNull(cache.localPeek(Integer.valueOf(i9), new CachePeekMode[]{CachePeekMode.ONHEAP}));
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th5) {
                if (dataStreamer != null) {
                    if (0 != 0) {
                        try {
                            dataStreamer.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        dataStreamer.close();
                    }
                }
                throw th5;
            }
        } finally {
            storeMap = null;
            stopAllGrids();
        }
    }

    @Test
    public void testCustomUserUpdater() throws Exception {
        this.useCache = true;
        try {
            Ignite startGridsMultiThreaded = startGridsMultiThreaded(3);
            afterGridStarted();
            IgniteDataStreamer dataStreamer = startGridsMultiThreaded.dataStreamer("default");
            Throwable th = null;
            try {
                try {
                    dataStreamer.allowOverwrite(true);
                    dataStreamer.keepBinary(customKeepBinary());
                    dataStreamer.receiver(getStreamReceiver());
                    for (int i = 0; i < 100; i++) {
                        dataStreamer.addData(String.valueOf(i), new TestObject(i));
                    }
                    if (dataStreamer != null) {
                        if (0 != 0) {
                            try {
                                dataStreamer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataStreamer.close();
                        }
                    }
                    IgniteCache cache = startGridsMultiThreaded.cache("default");
                    for (int i2 = 0; i2 < 100; i2++) {
                        TestObject testObject = (TestObject) cache.get(String.valueOf(i2));
                        assertNotNull(testObject);
                        assertEquals(i2 + 1, testObject.val);
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            stopAllGrids();
        }
    }

    @Test
    public void testLocalDataStreamerDedicatedThreadPool() throws Exception {
        try {
            this.useCache = true;
            IgniteEx startGrid = startGrid(1);
            afterGridStarted();
            final IgniteCache cache = startGrid.cache("default");
            IgniteDataStreamer dataStreamer = startGrid.dataStreamer("default");
            Throwable th = null;
            try {
                try {
                    dataStreamer.receiver(new StreamReceiver<String, String>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.13
                        public void receive(IgniteCache<String, String> igniteCache, Collection<Map.Entry<String, String>> collection) throws IgniteException {
                            igniteCache.put(IgniteMarshallerCacheSeparateDirectoryTest.KEY, Thread.currentThread().getName());
                        }
                    });
                    dataStreamer.addData(IgniteMarshallerCacheSeparateDirectoryTest.KEY, "value");
                    dataStreamer.tryFlush();
                    GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.14
                        public boolean apply() {
                            return cache.get(IgniteMarshallerCacheSeparateDirectoryTest.KEY) != null;
                        }
                    }, 3000L);
                    if (dataStreamer != null) {
                        if (0 != 0) {
                            try {
                                dataStreamer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataStreamer.close();
                        }
                    }
                    assertNotNull(cache.get(IgniteMarshallerCacheSeparateDirectoryTest.KEY));
                    assertTrue(((String) cache.get(IgniteMarshallerCacheSeparateDirectoryTest.KEY)).startsWith("data-streamer"));
                    stopAllGrids();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            stopAllGrids();
            throw th3;
        }
    }

    @Test
    public void testRemoteDataStreamerDedicatedThreadPool() throws Exception {
        try {
            this.useCache = true;
            IgniteEx startGrid = startGrid(1);
            this.useCache = false;
            IgniteEx startGrid2 = startGrid(0);
            afterGridStarted();
            final IgniteCache cache = startGrid.cache("default");
            IgniteDataStreamer dataStreamer = startGrid2.dataStreamer("default");
            Throwable th = null;
            try {
                try {
                    dataStreamer.receiver(new StringStringStreamReceiver());
                    dataStreamer.addData(IgniteMarshallerCacheSeparateDirectoryTest.KEY, "value");
                    dataStreamer.tryFlush();
                    GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest.15
                        public boolean apply() {
                            return cache.get(IgniteMarshallerCacheSeparateDirectoryTest.KEY) != null;
                        }
                    }, 3000L);
                    if (dataStreamer != null) {
                        if (0 != 0) {
                            try {
                                dataStreamer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataStreamer.close();
                        }
                    }
                    assertNotNull(cache.get(IgniteMarshallerCacheSeparateDirectoryTest.KEY));
                    assertTrue(((String) cache.get(IgniteMarshallerCacheSeparateDirectoryTest.KEY)).startsWith("data-streamer"));
                    stopAllGrids();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            stopAllGrids();
            throw th3;
        }
    }

    protected StreamReceiver<String, TestObject> getStreamReceiver() {
        return new TestDataReceiver();
    }

    private void afterGridStarted() throws InterruptedException {
        G.allGrids().stream().filter(ignite -> {
            return !ignite.cluster().node().isClient();
        }).findAny().filter(ignite2 -> {
            return !ignite2.cluster().active();
        }).ifPresent(ignite3 -> {
            ignite3.cluster().active(true);
        });
        awaitPartitionMapExchange();
    }

    static {
        $assertionsDisabled = !DataStreamProcessorSelfTest.class.desiredAssertionStatus();
    }
}
