package com.datatorrent.contrib.formatter;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.parser.CsvPOJOParserTest;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import java.io.IOException;
import javax.validation.ConstraintViolationException;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

/* loaded from: input_file:com/datatorrent/contrib/formatter/CsvFormatterTest.class */
public class CsvFormatterTest {
    private static final String filename = "schema.json";
    CsvFormatter operator;
    CollectorTestSink<Object> validDataSink;
    CollectorTestSink<String> invalidDataSink;

    @Rule
    public Watcher watcher = new Watcher();

    /* loaded from: input_file:com/datatorrent/contrib/formatter/CsvFormatterTest$CsvParserApplication.class */
    public static class CsvParserApplication implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            PojoEmitter addOperator = dag.addOperator("data", new PojoEmitter());
            CsvFormatter addOperator2 = dag.addOperator("formatter", new CsvFormatter());
            dag.getMeta(addOperator2).getMeta(addOperator2.in).getAttributes().put(Context.PortContext.TUPLE_CLASS, CsvPOJOParserTest.Ad.class);
            addOperator2.setSchema(SchemaUtils.jarResourceFileToString(CsvFormatterTest.filename));
            ConsoleOutputOperator addOperator3 = dag.addOperator("output", new ConsoleOutputOperator());
            ConsoleOutputOperator addOperator4 = dag.addOperator("error", new ConsoleOutputOperator());
            addOperator3.setDebug(true);
            dag.addStream("input", addOperator.output, addOperator2.in);
            dag.addStream("output", addOperator2.out, addOperator3.input);
            dag.addStream("err", addOperator2.err, addOperator4.input);
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/formatter/CsvFormatterTest$PojoEmitter.class */
    public static class PojoEmitter extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();

        public void emitTuples() {
            CsvPOJOParserTest.Ad ad = new CsvPOJOParserTest.Ad();
            ad.setCampaignId(9823);
            ad.setAdId(1234);
            ad.setAdName("ad");
            ad.setBidPrice(1.2d);
            ad.setStartDate(new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate());
            ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate());
            ad.setSecurityCode(12345678L);
            ad.setParentCampaign("CAMP_AD");
            ad.setActive(true);
            ad.setWeatherTargeted('y');
            ad.setValid("valid");
            this.output.emit(ad);
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/formatter/CsvFormatterTest$Watcher.class */
    public class Watcher extends TestWatcher {
        public Watcher() {
        }

        protected void starting(Description description) {
            super.starting(description);
            CsvFormatterTest.this.operator = new CsvFormatter();
            CsvFormatterTest.this.operator.setClazz(CsvPOJOParserTest.Ad.class);
            CsvFormatterTest.this.operator.setSchema(SchemaUtils.jarResourceFileToString(CsvFormatterTest.filename));
            CsvFormatterTest.this.validDataSink = new CollectorTestSink<>();
            CsvFormatterTest.this.invalidDataSink = new CollectorTestSink<>();
            TestUtils.setSink(CsvFormatterTest.this.operator.out, CsvFormatterTest.this.validDataSink);
            TestUtils.setSink(CsvFormatterTest.this.operator.err, CsvFormatterTest.this.invalidDataSink);
        }

        protected void finished(Description description) {
            super.finished(description);
            CsvFormatterTest.this.operator.teardown();
        }
    }

    @Test
    public void testPojoReaderToCsv() {
        this.operator.setup((Context.OperatorContext) null);
        CsvPOJOParserTest.Ad ad = new CsvPOJOParserTest.Ad();
        ad.setCampaignId(9823);
        ad.setAdId(1234);
        ad.setAdName("ad");
        ad.setBidPrice(1.2d);
        ad.setStartDate(new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate());
        ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate());
        ad.setSecurityCode(12345678L);
        ad.setParentCampaign("CAMP_AD");
        ad.setActive(true);
        ad.setWeatherTargeted('y');
        ad.setValid("valid");
        this.operator.in.process(ad);
        Assert.assertEquals(1L, this.validDataSink.collectedTuples.size());
        Assert.assertEquals(0L, this.invalidDataSink.collectedTuples.size());
        String str = (String) this.validDataSink.collectedTuples.get(0);
        Assert.assertNotNull(str);
        Assert.assertEquals("1234,9823,ad,1.2,2015-01-01 00:00:00,01/01/2016,12345678,true,false,CAMP_AD,y,valid\r\n", str);
        Assert.assertEquals(1L, this.operator.getIncomingTuplesCount());
        Assert.assertEquals(0L, this.operator.getErrorTupleCount());
        Assert.assertEquals(1L, this.operator.getEmittedObjectCount());
    }

    @Test
    public void testPojoReaderToCsvNullInput() {
        this.operator.setup((Context.OperatorContext) null);
        this.operator.in.process((Object) null);
        Assert.assertEquals(0L, this.validDataSink.collectedTuples.size());
        Assert.assertEquals(1L, this.invalidDataSink.collectedTuples.size());
        Assert.assertEquals(1L, this.operator.getIncomingTuplesCount());
        Assert.assertEquals(1L, this.operator.getErrorTupleCount());
        Assert.assertEquals(0L, this.operator.getEmittedObjectCount());
    }

    @Test
    public void testApplication() throws IOException, Exception {
        try {
            LocalMode newInstance = LocalMode.newInstance();
            newInstance.prepareDAG(new CsvParserApplication(), new Configuration(false));
            newInstance.getController().run(5000L);
        } catch (ConstraintViolationException e) {
            Assert.fail("constraint violations: " + e.getConstraintViolations());
        }
    }
}
