package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/StreamingContainerTest.class */
public class StreamingContainerTest {
    private static final Logger logger = LoggerFactory.getLogger(StreamingContainerTest.class);
    private static Set<String> committedWindowIds = Collections.synchronizedSet(new HashSet());
    private static Set<String> checkpointedWindowIds = Collections.synchronizedSet(new HashSet());

    /* loaded from: input_file:com/datatorrent/stram/engine/StreamingContainerTest$CommitAwareOperator.class */
    private static class CommitAwareOperator extends BaseOperator implements Operator.CheckpointListener, InputOperator {
        private transient String name;
        public final transient DefaultOutputPort<String> output;

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<String> input;

        private CommitAwareOperator() {
            this.output = new DefaultOutputPort<>();
            this.input = new DefaultInputPort<String>() { // from class: com.datatorrent.stram.engine.StreamingContainerTest.CommitAwareOperator.1
                public void process(String str) {
                }
            };
        }

        public void setup(Context.OperatorContext operatorContext) {
            this.name = operatorContext.getName();
        }

        public void checkpointed(long j) {
            StreamingContainerTest.checkpointedWindowIds.add(this.name);
            StreamingContainerTest.logger.debug("checkpointed {} {}", this.name, Long.valueOf(j));
        }

        public void committed(long j) {
            StreamingContainerTest.committedWindowIds.add(this.name);
            StreamingContainerTest.logger.debug("committed {} {}", this.name, Long.valueOf(j));
        }

        public void emitTuples() {
        }
    }

    @Test
    public void testCommitted() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/testCommitted").getAbsolutePath(), (Configuration) null));
        logicalPlan.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        logicalPlan.addOperator("CommitAwareOperatorTestCommit", new CommitAwareOperator());
        new StramLocalCluster(logicalPlan).run(5000L);
        Assert.assertTrue("No Committed Windows", committedWindowIds.contains("CommitAwareOperatorTestCommit"));
    }

    @Test
    public void testOiOCommitted() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/testCommitted").getAbsolutePath(), (Configuration) null));
        logicalPlan.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        logicalPlan.addStream("local", logicalPlan.addOperator("CommitAwareOperatorTestOioCommit1", new CommitAwareOperator()).output, logicalPlan.addOperator("CommitAwareOperatorTestOioCommit2", new CommitAwareOperator()).input).setLocality(DAG.Locality.THREAD_LOCAL);
        new StramLocalCluster(logicalPlan).run(5000L);
        Assert.assertTrue("No Committed Windows", committedWindowIds.contains("CommitAwareOperatorTestOioCommit1"));
        Assert.assertTrue("No Committed Windows", committedWindowIds.contains("CommitAwareOperatorTestOioCommit2"));
    }
}
