package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample.class */
public class TriggerExample {
    public static final int WINDOW_DURATION = 30;
    public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
    public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
    public static final Duration ONE_DAY = Duration.standardDays(1);

    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample$CalculateTotalFlow.class */
    static class CalculateTotalFlow extends PTransform<PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
        private int windowDuration;

        CalculateTotalFlow(int i) {
            this.windowDuration = i;
        }

        public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> pCollection) {
            PCollection apply = pCollection.apply("Default", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.ZERO).discardingFiredPanes()).apply(new TotalFlow("default"));
            return PCollectionList.of(apply).and(pCollection.apply("WithLateData", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).discardingFiredPanes().withAllowedLateness(TriggerExample.ONE_DAY)).apply(new TotalFlow("withAllowedLateness"))).and(pCollection.apply("Speculative", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TriggerExample.ONE_MINUTE))).accumulatingFiredPanes().withAllowedLateness(TriggerExample.ONE_DAY)).apply(new TotalFlow("speculative"))).and(pCollection.apply("Sequential", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(AfterEach.inOrder(new Trigger[]{Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TriggerExample.ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TriggerExample.FIVE_MINUTES))})).accumulatingFiredPanes().withAllowedLateness(TriggerExample.ONE_DAY)).apply(new TotalFlow("sequential")));
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample$ExtractFlowInfo.class */
    static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
        ExtractFlowInfo() {
        }

        public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) throws Exception {
            String[] split = ((String) processContext.element()).split(",");
            if (!split[0].equals("timestamp") && split.length >= 48) {
                String str = split[2];
                Integer tryIntegerParse = TriggerExample.tryIntegerParse(split[7]);
                if (tryIntegerParse == null || tryIntegerParse.intValue() <= 0) {
                    return;
                }
                processContext.output(KV.of(str, tryIntegerParse));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample$FormatTotalFlow.class */
    public static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> implements DoFn.RequiresWindowAccess {
        private String triggerType;

        public FormatTotalFlow(String str) {
            this.triggerType = str;
        }

        public void processElement(DoFn<KV<String, String>, TableRow>.ProcessContext processContext) throws Exception {
            String[] split = ((String) ((KV) processContext.element()).getValue()).split(",");
            processContext.output(new TableRow().set("trigger_type", this.triggerType).set("freeway", ((KV) processContext.element()).getKey()).set("total_flow", Integer.valueOf(Integer.parseInt(split[0]))).set("number_of_records", Long.valueOf(Long.parseLong(split[1]))).set("window", processContext.window().toString()).set("isFirst", Boolean.valueOf(processContext.pane().isFirst())).set("isLast", Boolean.valueOf(processContext.pane().isLast())).set("timing", processContext.pane().getTiming().toString()).set("event_time", processContext.timestamp().toString()).set("processing_time", Instant.now().toString()));
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample$InsertDelays.class */
    public static class InsertDelays extends DoFn<String, String> {
        private static final double THRESHOLD = 0.001d;
        private static final int MIN_DELAY = 1;
        private static final int MAX_DELAY = 100;

        public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
            Instant now = Instant.now();
            if (Math.random() < THRESHOLD) {
                now = new Instant(now.getMillis() - TimeUnit.MINUTES.toMillis(((int) (Math.random() * 99)) + MIN_DELAY));
            }
            processContext.outputWithTimestamp(processContext.element(), now);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample$TotalFlow.class */
    public static class TotalFlow extends PTransform<PCollection<KV<String, Integer>>, PCollection<TableRow>> {
        private String triggerType;

        public TotalFlow(String str) {
            this.triggerType = str;
        }

        public PCollection<TableRow> apply(PCollection<KV<String, Integer>> pCollection) {
            return pCollection.apply(GroupByKey.create()).apply(ParDo.of(new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() { // from class: org.apache.beam.examples.cookbook.TriggerExample.TotalFlow.1
                public void processElement(DoFn<KV<String, Iterable<Integer>>, KV<String, String>>.ProcessContext processContext) throws Exception {
                    Integer num = 0;
                    Long l = 0L;
                    Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                    while (it.hasNext()) {
                        num = Integer.valueOf(num.intValue() + ((Integer) it.next()).intValue());
                        l = Long.valueOf(l.longValue() + 1);
                    }
                    processContext.output(KV.of(((KV) processContext.element()).getKey(), num + "," + l));
                }
            })).apply(ParDo.of(new FormatTotalFlow(this.triggerType)));
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/cookbook/TriggerExample$TrafficFlowOptions.class */
    public interface TrafficFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
        @Default.String("gs://dataflow-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
        @Description("Input file to read from")
        String getInput();

        void setInput(String str);

        @Description("Numeric value of window duration for fixed windows, in minutes")
        @Default.Integer(TriggerExample.WINDOW_DURATION)
        Integer getWindowDuration();

        void setWindowDuration(Integer num);
    }

    public static void main(String[] strArr) throws Exception {
        TrafficFlowOptions trafficFlowOptions = (TrafficFlowOptions) PipelineOptionsFactory.fromArgs(strArr).withValidation().as(TrafficFlowOptions.class);
        trafficFlowOptions.setStreaming(true);
        trafficFlowOptions.setBigQuerySchema(getSchema());
        ExampleUtils exampleUtils = new ExampleUtils(trafficFlowOptions);
        exampleUtils.setup();
        Pipeline create = Pipeline.create(trafficFlowOptions);
        TableReference tableReference = getTableReference(trafficFlowOptions.getProject(), trafficFlowOptions.getBigQueryDataset(), trafficFlowOptions.getBigQueryTable());
        PCollectionList apply = create.apply("ReadMyFile", TextIO.Read.from(trafficFlowOptions.getInput())).apply("InsertRandomDelays", ParDo.of(new InsertDelays())).apply(ParDo.of(new ExtractFlowInfo())).apply(new CalculateTotalFlow(trafficFlowOptions.getWindowDuration().intValue()));
        for (int i = 0; i < apply.size(); i++) {
            apply.get(i).apply(BigQueryIO.Write.to(tableReference).withSchema(getSchema()));
        }
        exampleUtils.waitToFinish(create.run());
    }

    private static TableReference getTableReference(String str, String str2, String str3) {
        TableReference tableReference = new TableReference();
        tableReference.setProjectId(str);
        tableReference.setDatasetId(str2);
        tableReference.setTableId(str3);
        return tableReference;
    }

    private static TableSchema getSchema() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("freeway").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
        arrayList.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
        arrayList.add(new TableFieldSchema().setName("window").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
        arrayList.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
        arrayList.add(new TableFieldSchema().setName("timing").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
        arrayList.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
        return new TableSchema().setFields(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Integer tryIntegerParse(String str) {
        try {
            return Integer.valueOf(Integer.parseInt(str));
        } catch (NumberFormatException e) {
            return null;
        }
    }
}
