package com.datatorrent.stram.stream;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/stream/OiOEndWindowTest.class */
public class OiOEndWindowTest {
    private static final Logger logger = LoggerFactory.getLogger(OiOEndWindowTest.class);

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOEndWindowTest$FirstGenericOperator.class */
    public static class FirstGenericOperator extends BaseOperator {
        public static long endwindowCount;
        public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
        public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { // from class: com.datatorrent.stram.stream.OiOEndWindowTest.FirstGenericOperator.1
            public void process(Number number) {
            }
        };

        public void endWindow() {
            endwindowCount++;
            OiOEndWindowTest.logger.info("in end window of 1st generic operator");
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOEndWindowTest$SecondGenericOperator.class */
    public static class SecondGenericOperator extends BaseOperator {
        public static long endwindowCount;
        public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { // from class: com.datatorrent.stram.stream.OiOEndWindowTest.SecondGenericOperator.1
            public void process(Number number) {
            }
        };

        public void endWindow() {
            endwindowCount++;
            OiOEndWindowTest.logger.info("in end window of 2nd generic operator");
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOEndWindowTest$TestInputOperator.class */
    public static class TestInputOperator extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();

        public void emitTuples() {
            BaseOperator.shutdown();
        }
    }

    @Test
    public void validateOiOImplementation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/validateOiOImplementation").getAbsolutePath(), (Configuration) null));
        TestInputOperator addOperator = logicalPlan.addOperator("Input Operator", new TestInputOperator());
        FirstGenericOperator addOperator2 = logicalPlan.addOperator("First Generic Operator", new FirstGenericOperator());
        SecondGenericOperator addOperator3 = logicalPlan.addOperator("Second Generic Operator", new SecondGenericOperator());
        logicalPlan.getOperatorMeta("Second Generic Operator").getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
        logicalPlan.addStream("Stream", addOperator.output, addOperator2.input);
        logicalPlan.addStream("Stream1", addOperator2.output, addOperator3.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("End Window Count", FirstGenericOperator.endwindowCount, SecondGenericOperator.endwindowCount);
    }
}
