package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.examples.cookbook.TriggerExample;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExampleTest.class */
public class TriggerExampleTest {
    private static final String[] INPUT = {"01/01/2010 00:00:00,1108302,94,E,ML,36,100,29,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", "01/01/2010 00:00:00,1100333,5,N,FR,9,0,39,,,9,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,"};
    private static final List<TimestampedValue<String>> TIME_STAMPED_INPUT = Arrays.asList(TimestampedValue.of("01/01/2010 00:00:00,1108302,5,W,ML,36,100,30,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,87.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", new Instant(60000)), TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,40,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", new Instant(1)), TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,50,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0,,,,,0,,,,,0", new Instant(1)));
    private static final TableRow OUT_ROW_1 = new TableRow().set("trigger_type", "default").set("freeway", "5").set("total_flow", 30).set("number_of_records", 1).set("isFirst", true).set("isLast", true).set("timing", "ON_TIME").set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
    private static final TableRow OUT_ROW_2 = new TableRow().set("trigger_type", "default").set("freeway", "110").set("total_flow", 90).set("number_of_records", 2).set("isFirst", true).set("isLast", true).set("timing", "ON_TIME").set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");

    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExampleTest$FormatResults.class */
    static class FormatResults extends DoFn<TableRow, String> {
        FormatResults() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<TableRow, String>.ProcessContext processContext) throws Exception {
            TableRow tableRow = (TableRow) processContext.element();
            processContext.output(TriggerExampleTest.canonicalFormat(new TableRow().set("trigger_type", tableRow.get("trigger_type")).set("freeway", tableRow.get("freeway")).set("total_flow", tableRow.get("total_flow")).set("number_of_records", tableRow.get("number_of_records")).set("isFirst", tableRow.get("isFirst")).set("isLast", tableRow.get("isLast")).set("timing", tableRow.get("timing")).set("window", tableRow.get("window"))));
        }
    }

    @Test
    public void testExtractTotalFlow() throws Exception {
        List processBundle = DoFnTester.of(new TriggerExample.ExtractFlowInfo()).processBundle(INPUT);
        Assert.assertEquals(processBundle.size(), 1L);
        Assert.assertEquals(((KV) processBundle.get(0)).getKey(), "94");
        Assert.assertEquals(((KV) processBundle.get(0)).getValue(), new Integer(29));
        Assert.assertEquals(r0.processBundle(new String[]{""}).size(), 0L);
    }

    @Test
    @Category({RunnableOnService.class})
    public void testTotalFlow() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.timestamped(TIME_STAMPED_INPUT)).apply(ParDo.of(new TriggerExample.ExtractFlowInfo())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))).apply(new TriggerExample.TotalFlow("default")).apply(ParDo.of(new FormatResults()))).containsInAnyOrder(new String[]{canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2)});
        create.run().waitUntilFinish();
    }

    static String canonicalFormat(TableRow tableRow) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(tableRow.size());
        for (Map.Entry entry : tableRow.entrySet()) {
            newArrayListWithCapacity.add(((String) entry.getKey()) + ":" + entry.getValue());
        }
        Collections.sort(newArrayListWithCapacity);
        return Joiner.on(",").join(newArrayListWithCapacity);
    }
}
