package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/StreamingContainerTest.class */
public class StreamingContainerTest {

    /* loaded from: input_file:com/datatorrent/stram/engine/StreamingContainerTest$CommitAwareOperator.class */
    private static class CommitAwareOperator extends BaseOperator implements Operator.CheckpointListener, InputOperator {
        public static ArrayList<Long> committedWindowIds;
        public static ArrayList<Long> checkpointedWindowIds = new ArrayList<>();
        private static final Logger logger = LoggerFactory.getLogger(CommitAwareOperator.class);

        private CommitAwareOperator() {
        }

        public static final synchronized List<Long> getCommittedWindowIdsContainer() {
            ArrayList<Long> arrayList = new ArrayList<>();
            committedWindowIds = arrayList;
            return arrayList;
        }

        public void checkpointed(long j) {
            checkpointedWindowIds.add(Long.valueOf(j));
            logger.debug("checkpointed {}", Long.valueOf(j));
        }

        public void committed(long j) {
            committedWindowIds.add(Long.valueOf(j));
            logger.debug("committed {}", Long.valueOf(j));
        }

        public void emitTuples() {
        }
    }

    @Test
    public void testCommitted() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/testCommitted").getAbsolutePath(), (Configuration) null));
        logicalPlan.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        logicalPlan.addOperator("CommitAwareOperator", new CommitAwareOperator());
        List<Long> committedWindowIdsContainer = CommitAwareOperator.getCommittedWindowIdsContainer();
        new StramLocalCluster(logicalPlan).run(5000L);
        Assert.assertSame("Concurrent Use detected", committedWindowIdsContainer, CommitAwareOperator.committedWindowIds);
        Assert.assertFalse("No Committed Windows", committedWindowIdsContainer.isEmpty());
    }
}
