package org.apache.ignite.internal;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.cache.CacheException;
import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;

/* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectStreamerTest.class */
public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbstractTest {
    public static final String CACHE_NAME = "streamer";

    @Override // org.apache.ignite.internal.IgniteClientReconnectAbstractTest
    protected int serverCount() {
        return 1;
    }

    @Override // org.apache.ignite.internal.IgniteClientReconnectAbstractTest
    protected int clientCount() {
        return 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.IgniteClientReconnectAbstractTest, org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration(CACHE_NAME).setAtomicityMode(CacheAtomicityMode.ATOMIC).setCacheMode(CacheMode.PARTITIONED)});
        return configuration;
    }

    public void testStreamerReconnect() throws Exception {
        final IgniteEx grid = grid(serverCount());
        assertTrue(grid.cluster().localNode().isClient());
        Ignite clientRouter = clientRouter(grid);
        final IgniteCache cache = clientRouter.cache(CACHE_NAME);
        IgniteDataStreamer<Integer, Integer> dataStreamer = grid.dataStreamer(CACHE_NAME);
        for (int i = 0; i < 50; i++) {
            dataStreamer.addData(Integer.valueOf(i), Integer.valueOf(i));
        }
        dataStreamer.flush();
        GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.IgniteClientReconnectStreamerTest.1
            public boolean apply() {
                return cache.localSize(new CachePeekMode[0]) == 50;
            }
        }, 2000L);
        assertEquals(50, cache.localSize(new CachePeekMode[0]));
        reconnectClientNode(grid, clientRouter, new Runnable() { // from class: org.apache.ignite.internal.IgniteClientReconnectStreamerTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    grid.dataStreamer(IgniteClientReconnectStreamerTest.CACHE_NAME);
                    TestCase.fail();
                } catch (IgniteClientDisconnectedException e) {
                    TestCase.assertNotNull(e.reconnectFuture());
                }
            }
        });
        checkStreamerClosed(dataStreamer);
        IgniteDataStreamer dataStreamer2 = grid.dataStreamer(CACHE_NAME);
        for (int i2 = 50; i2 < 100; i2++) {
            dataStreamer2.addData(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        dataStreamer2.flush();
        GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.IgniteClientReconnectStreamerTest.3
            public boolean apply() {
                return cache.localSize(new CachePeekMode[0]) == 100;
            }
        }, 2000L);
        assertEquals(100, cache.localSize(new CachePeekMode[0]));
        dataStreamer2.close();
        dataStreamer2.future().get(2L, TimeUnit.SECONDS);
        cache.removeAll();
    }

    public void testStreamerReconnectInProgress() throws Exception {
        IgniteEx grid = grid(serverCount());
        assertTrue(grid.cluster().localNode().isClient());
        Ignite clientRouter = clientRouter(grid);
        final IgniteCache cache = clientRouter.cache(CACHE_NAME);
        final IgniteDataStreamer<Integer, Integer> dataStreamer = grid.dataStreamer(CACHE_NAME);
        IgniteClientReconnectAbstractTest.BlockTcpCommunicationSpi commSpi = commSpi(clientRouter);
        commSpi.blockMessage(DataStreamerResponse.class);
        final IgniteInternalFuture<?> runAsync = GridTestUtils.runAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.IgniteClientReconnectStreamerTest.4
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                for (int i = 0; i < 50; i++) {
                    try {
                        dataStreamer.addData(Integer.valueOf(i), Integer.valueOf(i));
                    } catch (CacheException e) {
                        IgniteClientReconnectStreamerTest.this.checkAndWait(e);
                        return true;
                    } finally {
                        dataStreamer.close();
                    }
                }
                dataStreamer.flush();
                return false;
            }
        });
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.internal.IgniteClientReconnectStreamerTest.5
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                return runAsync.get(200L);
            }
        }, IgniteFutureTimeoutCheckedException.class, null);
        assertNotDone(runAsync);
        commSpi.unblockMessage();
        reconnectClientNode(grid, clientRouter, null);
        assertTrue(((Boolean) runAsync.get(2L, TimeUnit.SECONDS)).booleanValue());
        checkStreamerClosed(dataStreamer);
        IgniteDataStreamer dataStreamer2 = grid.dataStreamer(CACHE_NAME);
        for (int i = 0; i < 50; i++) {
            dataStreamer2.addData(Integer.valueOf(i), Integer.valueOf(i));
        }
        dataStreamer2.close();
        GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.IgniteClientReconnectStreamerTest.6
            public boolean apply() {
                return cache.localSize(new CachePeekMode[0]) == 50;
            }
        }, 2000L);
        assertEquals(50, cache.localSize(new CachePeekMode[0]));
    }

    private void checkStreamerClosed(IgniteDataStreamer<Integer, Integer> igniteDataStreamer) {
        try {
            igniteDataStreamer.addData(100, 100).get();
            fail();
        } catch (CacheException e) {
            checkAndWait(e);
        }
        try {
            igniteDataStreamer.flush();
            fail();
        } catch (CacheException e2) {
            checkAndWait(e2);
        }
        try {
            igniteDataStreamer.future().get();
            fail();
        } catch (CacheException e3) {
            checkAndWait(e3);
        }
        igniteDataStreamer.tryFlush();
        igniteDataStreamer.close();
    }
}
