package com.google.cloud.dataflow.examples.cookbook;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
import com.google.cloud.dataflow.examples.common.PubsubFileInjector;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterEach;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample.class */
public class TriggerExample {
    public static final int WINDOW_DURATION = 30;
    public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
    public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
    public static final Duration ONE_DAY = Duration.standardDays(1);
    private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";

    /* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample$CalculateTotalFlow.class */
    static class CalculateTotalFlow extends PTransform<PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
        private int windowDuration;

        CalculateTotalFlow(int i) {
            this.windowDuration = i;
        }

        public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> pCollection) {
            PCollection apply = pCollection.apply("Default", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.ZERO).discardingFiredPanes()).apply(new TotalFlow("default"));
            return PCollectionList.of(apply).and(pCollection.apply("WithLateData", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).discardingFiredPanes().withAllowedLateness(TriggerExample.ONE_DAY)).apply(new TotalFlow("withAllowedLateness"))).and(pCollection.apply("Speculative", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TriggerExample.ONE_MINUTE))).accumulatingFiredPanes().withAllowedLateness(TriggerExample.ONE_DAY)).apply(new TotalFlow("speculative"))).and(pCollection.apply("Sequential", Window.into(FixedWindows.of(Duration.standardMinutes(this.windowDuration))).triggering(AfterEach.inOrder(new Trigger[]{Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TriggerExample.ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TriggerExample.FIVE_MINUTES))})).accumulatingFiredPanes().withAllowedLateness(TriggerExample.ONE_DAY)).apply(new TotalFlow("sequential")));
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample$ExtractFlowInfo.class */
    static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
        ExtractFlowInfo() {
        }

        public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) throws Exception {
            String[] split = ((String) processContext.element()).split(",");
            if (!split[0].equals("timestamp") && split.length >= 48) {
                String str = split[2];
                Integer tryIntegerParse = TriggerExample.tryIntegerParse(split[7]);
                if (tryIntegerParse == null || tryIntegerParse.intValue() <= 0) {
                    return;
                }
                processContext.output(KV.of(str, tryIntegerParse));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample$FormatTotalFlow.class */
    public static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> implements DoFn.RequiresWindowAccess {
        private String triggerType;

        public FormatTotalFlow(String str) {
            this.triggerType = str;
        }

        public void processElement(DoFn<KV<String, String>, TableRow>.ProcessContext processContext) throws Exception {
            String[] split = ((String) ((KV) processContext.element()).getValue()).split(",");
            processContext.output(new TableRow().set("trigger_type", this.triggerType).set("freeway", ((KV) processContext.element()).getKey()).set("total_flow", Integer.valueOf(Integer.parseInt(split[0]))).set("number_of_records", Long.valueOf(Long.parseLong(split[1]))).set("window", processContext.window().toString()).set("isFirst", Boolean.valueOf(processContext.pane().isFirst())).set("isLast", Boolean.valueOf(processContext.pane().isLast())).set("timing", processContext.pane().getTiming().toString()).set("event_time", processContext.timestamp().toString()).set("processing_time", Instant.now().toString()));
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample$InsertDelays.class */
    public static class InsertDelays extends DoFn<String, String> {
        private static final double THRESHOLD = 0.001d;
        private static final int MIN_DELAY = 1;
        private static final int MAX_DELAY = 100;

        public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
            Instant now = Instant.now();
            if (Math.random() < THRESHOLD) {
                now = new Instant(now.getMillis() - TimeUnit.MINUTES.toMillis(((int) (Math.random() * 99)) + MIN_DELAY));
            }
            processContext.outputWithTimestamp(processContext.element(), now);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample$TotalFlow.class */
    public static class TotalFlow extends PTransform<PCollection<KV<String, Integer>>, PCollection<TableRow>> {
        private String triggerType;

        public TotalFlow(String str) {
            this.triggerType = str;
        }

        public PCollection<TableRow> apply(PCollection<KV<String, Integer>> pCollection) {
            return pCollection.apply(GroupByKey.create()).apply(ParDo.of(new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() { // from class: com.google.cloud.dataflow.examples.cookbook.TriggerExample.TotalFlow.1
                public void processElement(DoFn<KV<String, Iterable<Integer>>, KV<String, String>>.ProcessContext processContext) throws Exception {
                    Integer num = 0;
                    Long l = 0L;
                    Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                    while (it.hasNext()) {
                        num = Integer.valueOf(num.intValue() + ((Integer) it.next()).intValue());
                        l = Long.valueOf(l.longValue() + 1);
                    }
                    Object key = ((KV) processContext.element()).getKey();
                    String valueOf = String.valueOf(num);
                    String valueOf2 = String.valueOf(l);
                    processContext.output(KV.of(key, new StringBuilder(1 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length()).append(valueOf).append(",").append(valueOf2).toString()));
                }
            })).apply(ParDo.of(new FormatTotalFlow(this.triggerType)));
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/examples/cookbook/TriggerExample$TrafficFlowOptions.class */
    public interface TrafficFlowOptions extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions, DataflowExampleOptions {
        @Default.String("gs://dataflow-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
        @Description("Input file to inject to Pub/Sub topic")
        String getInput();

        void setInput(String str);

        @Default.Integer(TriggerExample.WINDOW_DURATION)
        @Description("Numeric value of window duration for fixed windows, in minutes")
        Integer getWindowDuration();

        void setWindowDuration(Integer num);
    }

    public static void main(String[] strArr) throws Exception {
        TrafficFlowOptions as = PipelineOptionsFactory.fromArgs(strArr).withValidation().as(TrafficFlowOptions.class);
        as.setStreaming(true);
        as.setRunner(DataflowPipelineRunner.class);
        as.setBigQuerySchema(getSchema());
        DataflowExampleUtils dataflowExampleUtils = new DataflowExampleUtils(as);
        dataflowExampleUtils.setup();
        Pipeline create = Pipeline.create(as);
        TableReference tableReference = getTableReference(as.getProject(), as.getBigQueryDataset(), as.getBigQueryTable());
        PCollectionList apply = create.apply(PubsubIO.Read.named("ReadPubsubInput").timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY).topic(as.getPubsubTopic())).apply(ParDo.of(new ExtractFlowInfo())).apply(new CalculateTotalFlow(as.getWindowDuration().intValue()));
        for (int i = 0; i < apply.size(); i++) {
            apply.get(i).apply(BigQueryIO.Write.to(tableReference).withSchema(getSchema()));
        }
        PipelineResult run = create.run();
        if (!as.getInput().isEmpty()) {
            dataflowExampleUtils.runInjectorPipeline(runInjector(as));
        }
        dataflowExampleUtils.waitToFinish(run);
    }

    private static Pipeline runInjector(TrafficFlowOptions trafficFlowOptions) {
        DataflowPipelineOptions cloneAs = trafficFlowOptions.cloneAs(DataflowPipelineOptions.class);
        cloneAs.setStreaming(false);
        cloneAs.setNumWorkers(trafficFlowOptions.as(DataflowExampleOptions.class).getInjectorNumWorkers());
        cloneAs.setJobName(String.valueOf(trafficFlowOptions.getJobName()).concat("-injector"));
        Pipeline create = Pipeline.create(cloneAs);
        create.apply(TextIO.Read.named("ReadMyFile").from(trafficFlowOptions.getInput())).apply(ParDo.named("InsertRandomDelays").of(new InsertDelays())).apply(IntraBundleParallelization.of(PubsubFileInjector.withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY).publish(trafficFlowOptions.getPubsubTopic())).withMaxParallelism(20));
        return create;
    }

    private static TableReference getTableReference(String str, String str2, String str3) {
        TableReference tableReference = new TableReference();
        tableReference.setProjectId(str);
        tableReference.setDatasetId(str2);
        tableReference.setTableId(str3);
        return tableReference;
    }

    private static TableSchema getSchema() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("freeway").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
        arrayList.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
        arrayList.add(new TableFieldSchema().setName("window").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
        arrayList.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
        arrayList.add(new TableFieldSchema().setName("timing").setType("STRING"));
        arrayList.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
        arrayList.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
        return new TableSchema().setFields(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Integer tryIntegerParse(String str) {
        try {
            return Integer.valueOf(Integer.parseInt(str));
        } catch (NumberFormatException e) {
            return null;
        }
    }
}
