package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import eu.rekawek.toxiproxy.model.toxic.LimitData;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDockerRule;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoConnectionFailureIT.class */
public class PipelinedMongoConnectionFailureIT {
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedMongoConnectionFailureIT.class);
    private static final DockerImageName TOXIPROXY_IMAGE = DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.9.0");
    private static final int MONGODB_DEFAULT_PORT = 27017;
    final boolean parallelDump;
    final boolean testUpdateContent;

    @Rule
    public final Network network = Network.newNetwork();

    @Rule
    public final MongoDBContainer mongoDBContainer = new MongoDBContainer(MongoDockerRule.getDockerImageName()).withNetwork(this.network).withNetworkAliases(new String[]{"mongo"}).withExposedPorts(new Integer[]{Integer.valueOf(MONGODB_DEFAULT_PORT)});

    @Rule
    public final ToxiproxyContainer toxiproxy = new ToxiproxyContainer(TOXIPROXY_IMAGE).withNetwork(this.network);

    @Rule
    public final DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();

    @Rule
    public final TemporaryFolder sortFolder = new TemporaryFolder();

    @Rule
    public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
    private Proxy proxy;
    private String mongoUriFlaky;
    private String mongoUriReliable;
    private static final int N_TREES = 10;
    private static final int N_NODES_PER_TREE = 100;

    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoConnectionFailureIT$UpdateContentTask.class */
    private class UpdateContentTask implements Runnable {
        private volatile boolean stop = false;

        private UpdateContentTask() {
        }

        public void stop() {
            PipelinedMongoConnectionFailureIT.LOG.info("Stopping update content task");
            this.stop = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                MongoTestBackend createNodeStore = PipelineITUtil.createNodeStore(false, PipelinedMongoConnectionFailureIT.this.mongoUriReliable, PipelinedMongoConnectionFailureIT.this.builderProvider);
                try {
                    DocumentNodeStore documentNodeStore = createNodeStore.documentNodeStore;
                    int i = 0;
                    while (!this.stop) {
                        PipelinedMongoConnectionFailureIT.LOG.info("Updating content, loop {}", Integer.valueOf(i));
                        for (int i2 = 0; i2 < PipelinedMongoConnectionFailureIT.N_TREES && !this.stop; i2++) {
                            NodeBuilder builder = documentNodeStore.getRoot().builder();
                            NodeBuilder child = builder.child("content").child("dam").child("parent" + i2);
                            for (int i3 = 0; i3 < PipelinedMongoConnectionFailureIT.N_NODES_PER_TREE; i3++) {
                                child.child("node-" + i3).setProperty("p1", "modified-" + i);
                            }
                            documentNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
                        }
                        Thread.sleep(100L);
                        i++;
                    }
                    PipelinedMongoConnectionFailureIT.LOG.info("Done updating content, stop requested");
                    if (createNodeStore != null) {
                        createNodeStore.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Parameterized.Parameters(name = "parallelDump={0}, testUpdateContent={1}")
    public static Collection<Object[]> parameters() {
        return List.of(new Object[]{true, false}, new Object[]{false, false}, new Object[]{true, true}, new Object[]{false, true});
    }

    public PipelinedMongoConnectionFailureIT(boolean z, boolean z2) {
        this.parallelDump = z;
        this.testUpdateContent = z2;
    }

    @BeforeClass
    public static void setup() throws IOException {
        Assume.assumeTrue(MongoUtils.isAvailable());
    }

    @Before
    public void before() throws Exception {
        ToxiproxyClient toxiproxyClient = new ToxiproxyClient(this.toxiproxy.getHost(), this.toxiproxy.getControlPort());
        this.proxy = toxiproxyClient.createProxy("mongo", "0.0.0.0:8666", "mongo:27017");
        this.mongoUriFlaky = "mongodb://" + this.toxiproxy.getHost() + ":" + this.toxiproxy.getMappedPort(8666) + "/" + MongoUtils.DB;
        toxiproxyClient.createProxy("mongo-reliable", "0.0.0.0:8667", "mongo:27017");
        this.mongoUriReliable = "mongodb://" + this.toxiproxy.getHost() + ":" + this.toxiproxy.getMappedPort(8667) + "/" + MongoUtils.DB;
        MongoConnection mongoConnection = new MongoConnection(this.mongoUriReliable);
        try {
            mongoConnection.getDatabase().drop();
        } finally {
            mongoConnection.close();
        }
    }

    @Test
    public void mongoDisconnectTest() throws Exception {
        System.setProperty("oak.indexer.pipelined.mongoParallelDump.secondariesOnly", "false");
        System.setProperty("oak.indexer.pipelined.mongoParallelDump", String.valueOf(this.parallelDump));
        System.setProperty("oak.indexer.pipelined.mongoDocBatchMaxNumberOfDocuments", String.valueOf(N_NODES_PER_TREE));
        System.setProperty("oak.indexer.pipelined.mongoBatchSize", String.valueOf(5));
        MongoTestBackend createNodeStore = PipelineITUtil.createNodeStore(false, this.mongoUriReliable, this.builderProvider);
        try {
            createContent(createNodeStore.documentNodeStore);
            if (createNodeStore != null) {
                createNodeStore.close();
            }
            LOG.info("Creating a FFS: reference run without failures.");
            createNodeStore = PipelineITUtil.createNodeStore(true, this.mongoUriReliable, this.builderProvider);
            try {
                Path path = createStrategy(createNodeStore).createSortedStoreFile().toPath();
                if (createNodeStore != null) {
                    createNodeStore.close();
                }
                LOG.info("Creating a FFS: test run with disconnection to Mongo.");
                createNodeStore = PipelineITUtil.createNodeStore(true, this.mongoUriFlaky, this.builderProvider);
                try {
                    LimitData limitData = this.proxy.toxics().limitData("CUT_CONNECTION_UPSTREAM", ToxicDirection.DOWNSTREAM, 100000L);
                    ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
                    try {
                        ScheduledFuture<?> scheduledFuture = null;
                        UpdateContentTask updateContentTask = null;
                        if (this.testUpdateContent) {
                            updateContentTask = new UpdateContentTask();
                            scheduledFuture = newScheduledThreadPool.schedule(updateContentTask, 1L, TimeUnit.SECONDS);
                        }
                        ScheduledFuture<?> schedule = newScheduledThreadPool.schedule(() -> {
                            try {
                                LOG.info("Removing connection block");
                                limitData.remove();
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }, 3L, TimeUnit.SECONDS);
                        Path path2 = createStrategy(createNodeStore).createSortedStoreFile().toPath();
                        if (scheduledFuture != null) {
                            updateContentTask.stop();
                            try {
                                scheduledFuture.get();
                            } catch (InterruptedException e) {
                            }
                        }
                        schedule.get();
                        newScheduledThreadPool.shutdown();
                        if (!newScheduledThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                            LOG.warn("Executor did not terminate in time");
                        }
                        if (createNodeStore != null) {
                            createNodeStore.close();
                        }
                        LOG.info("Comparing resulting FFS with and without Mongo disconnections: {} {}", path, path2);
                        Assert.assertEquals(Files.readAllLines(path), Files.readAllLines(path2));
                    } catch (Throwable th) {
                        newScheduledThreadPool.shutdown();
                        if (!newScheduledThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                            LOG.warn("Executor did not terminate in time");
                        }
                        throw th;
                    }
                } finally {
                    if (createNodeStore != null) {
                        try {
                            createNodeStore.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    private PipelinedStrategy createStrategy(MongoTestBackend mongoTestBackend) throws IOException {
        return PipelineITUtil.createStrategy(mongoTestBackend, str -> {
            return true;
        }, null, this.sortFolder.newFolder());
    }

    private static void createContent(NodeStore nodeStore) throws CommitFailedException {
        LOG.info("Creating content");
        String repeat = "0123456789".repeat(500);
        Stopwatch createStarted = Stopwatch.createStarted();
        for (int i = 0; i < N_TREES; i++) {
            NodeBuilder builder = nodeStore.getRoot().builder();
            NodeBuilder child = builder.child("content").child("dam").child("parent" + i);
            for (int i2 = 0; i2 < N_NODES_PER_TREE; i2++) {
                child.child("node-" + i2).setProperty("payload", repeat).setProperty("p1", "value-" + i + "-" + i2);
            }
            nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
        }
        LOG.info("Done creating content in {} ms", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
    }
}
