package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.engine.ProcessingModeTests;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/AtLeastOnceTest.class */
public class AtLeastOnceTest {
    private static final Logger logger = LoggerFactory.getLogger(AtLeastOnceTest.class);

    @Before
    public void setup() throws IOException {
        StreamingContainer.eventloop.start();
    }

    @After
    public void teardown() {
        StreamingContainer.eventloop.stop();
    }

    @Test
    public void testInputOperatorRecovery() throws Exception {
        RecoverableInputOperator.initGenTuples();
        ProcessingModeTests.CollectorOperator.collection.clear();
        LogicalPlan logicalPlan = new LogicalPlan();
        AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(new File("target/testInputOperatorRecovery").getAbsolutePath(), (Configuration) null);
        asyncFSStorageAgent.setSyncCheckpoint(true);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
        logicalPlan.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        logicalPlan.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        RecoverableInputOperator addOperator = logicalPlan.addOperator("LongGenerator", RecoverableInputOperator.class);
        addOperator.setMaximumTuples(30);
        addOperator.setSimulateFailure(true);
        logicalPlan.addStream("connection", addOperator.output, logicalPlan.addOperator("LongCollector", ProcessingModeTests.CollectorOperator.class).input);
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("Generated Outputs", 30, ProcessingModeTests.CollectorOperator.collection.size());
    }

    @Test
    public void testOperatorRecovery() throws Exception {
        RecoverableInputOperator.initGenTuples();
        ProcessingModeTests.CollectorOperator.collection.clear();
        LogicalPlan logicalPlan = new LogicalPlan();
        AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(new File("target/testOperatorRecovery").getAbsolutePath(), (Configuration) null);
        asyncFSStorageAgent.setSyncCheckpoint(true);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
        logicalPlan.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        logicalPlan.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        RecoverableInputOperator addOperator = logicalPlan.addOperator("LongGenerator", RecoverableInputOperator.class);
        addOperator.setMaximumTuples(30);
        addOperator.setSimulateFailure(true);
        ProcessingModeTests.CollectorOperator addOperator2 = logicalPlan.addOperator("LongCollector", ProcessingModeTests.CollectorOperator.class);
        addOperator2.setSimulateFailure(true);
        logicalPlan.addStream("connection", addOperator.output, addOperator2.input);
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("Generated Outputs", 30, ProcessingModeTests.CollectorOperator.collection.size());
    }

    @Test
    public void testInlineOperatorsRecovery() throws Exception {
        RecoverableInputOperator.initGenTuples();
        ProcessingModeTests.CollectorOperator.collection.clear();
        LogicalPlan logicalPlan = new LogicalPlan();
        AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(new File("target/testOperatorRecovery").getAbsolutePath(), (Configuration) null);
        asyncFSStorageAgent.setSyncCheckpoint(true);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
        logicalPlan.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        logicalPlan.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        RecoverableInputOperator addOperator = logicalPlan.addOperator("LongGenerator", RecoverableInputOperator.class);
        addOperator.setMaximumTuples(30);
        addOperator.setSimulateFailure(true);
        ProcessingModeTests.CollectorOperator addOperator2 = logicalPlan.addOperator("LongCollector", ProcessingModeTests.CollectorOperator.class);
        addOperator2.setSimulateFailure(true);
        logicalPlan.addStream("connection", addOperator.output, addOperator2.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("Generated Outputs", 30, ProcessingModeTests.CollectorOperator.collection.size());
    }
}
