package org.apache.beam.sdk.nexmark;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.nexmark.NexmarkOptions;
import org.apache.beam.sdk.nexmark.NexmarkPerf;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
import org.apache.beam.sdk.nexmark.queries.Query0;
import org.apache.beam.sdk.nexmark.queries.Query0Model;
import org.apache.beam.sdk.nexmark.queries.Query1;
import org.apache.beam.sdk.nexmark.queries.Query10;
import org.apache.beam.sdk.nexmark.queries.Query11;
import org.apache.beam.sdk.nexmark.queries.Query12;
import org.apache.beam.sdk.nexmark.queries.Query1Model;
import org.apache.beam.sdk.nexmark.queries.Query2;
import org.apache.beam.sdk.nexmark.queries.Query2Model;
import org.apache.beam.sdk.nexmark.queries.Query3;
import org.apache.beam.sdk.nexmark.queries.Query3Model;
import org.apache.beam.sdk.nexmark.queries.Query4;
import org.apache.beam.sdk.nexmark.queries.Query4Model;
import org.apache.beam.sdk.nexmark.queries.Query5;
import org.apache.beam.sdk.nexmark.queries.Query5Model;
import org.apache.beam.sdk.nexmark.queries.Query6;
import org.apache.beam.sdk.nexmark.queries.Query6Model;
import org.apache.beam.sdk.nexmark.queries.Query7;
import org.apache.beam.sdk.nexmark.queries.Query7Model;
import org.apache.beam.sdk.nexmark.queries.Query8;
import org.apache.beam.sdk.nexmark.queries.Query8Model;
import org.apache.beam.sdk.nexmark.queries.Query9;
import org.apache.beam.sdk.nexmark.queries.Query9Model;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdks.java.nexmark.repackaged.com.google.common.base.Ascii;
import org.apache.beam.sdks.java.nexmark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.nexmark.repackaged.com.google.common.base.Strings;
import org.apache.beam.sdks.java.nexmark.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.nexmark.repackaged.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/nexmark/NexmarkLauncher.class */
public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    private static final int MIN_SAMPLES = 9;
    private final OptionT options;

    @Nullable
    private NexmarkConfiguration configuration;

    @Nullable
    private Monitor<Event> publisherMonitor;

    @Nullable
    private PipelineResult publisherResult;

    @Nullable
    private PipelineResult mainResult;

    @Nullable
    private String queryName;
    private static final Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
    private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
    private static final Duration PERF_DELAY = Duration.standardSeconds(15);
    private static final Duration DONE_DELAY = Duration.standardMinutes(1);
    private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
    private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
    private static final TupleTag<String> MAIN = new TupleTag<String>() { // from class: org.apache.beam.sdk.nexmark.NexmarkLauncher.4
    };
    private static final TupleTag<String> SIDE = new TupleTag<String>() { // from class: org.apache.beam.sdk.nexmark.NexmarkLauncher.5
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.nexmark.NexmarkLauncher$6, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/NexmarkLauncher$6.class */
    public static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$PipelineResult$State;
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType = new int[NexmarkUtils.SinkType.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[NexmarkUtils.SinkType.DEVNULL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[NexmarkUtils.SinkType.PUBSUB.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[NexmarkUtils.SinkType.TEXT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[NexmarkUtils.SinkType.AVRO.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[NexmarkUtils.SinkType.BIGQUERY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[NexmarkUtils.SinkType.COUNT_ONLY.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SourceType = new int[NexmarkUtils.SourceType.values().length];
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SourceType[NexmarkUtils.SourceType.DIRECT.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SourceType[NexmarkUtils.SourceType.AVRO.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SourceType[NexmarkUtils.SourceType.PUBSUB.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$PubSubMode = new int[NexmarkUtils.PubSubMode.values().length];
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$PubSubMode[NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY.ordinal()] = 1;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$PubSubMode[NexmarkUtils.PubSubMode.PUBLISH_ONLY.ordinal()] = 2;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$PubSubMode[NexmarkUtils.PubSubMode.COMBINED.ordinal()] = 3;
            } catch (NoSuchFieldError e12) {
            }
            $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$ResourceNameMode = new int[NexmarkUtils.ResourceNameMode.values().length];
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$ResourceNameMode[NexmarkUtils.ResourceNameMode.VERBATIM.ordinal()] = 1;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$ResourceNameMode[NexmarkUtils.ResourceNameMode.QUERY.ordinal()] = 2;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$ResourceNameMode[NexmarkUtils.ResourceNameMode.QUERY_AND_SALT.ordinal()] = 3;
            } catch (NoSuchFieldError e15) {
            }
            $SwitchMap$org$apache$beam$sdk$PipelineResult$State = new int[PipelineResult.State.values().length];
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.UNKNOWN.ordinal()] = 1;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.STOPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.DONE.ordinal()] = 4;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.CANCELLED.ordinal()] = 5;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.UPDATED.ordinal()] = 7;
            } catch (NoSuchFieldError e22) {
            }
            $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkLauncher$DistributionType = new int[DistributionType.values().length];
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkLauncher$DistributionType[DistributionType.MIN.ordinal()] = 1;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$nexmark$NexmarkLauncher$DistributionType[DistributionType.MAX.ordinal()] = 2;
            } catch (NoSuchFieldError e24) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/NexmarkLauncher$DistributionType.class */
    public enum DistributionType {
        MIN,
        MAX
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/NexmarkLauncher$PartitionDoFn.class */
    public static class PartitionDoFn extends DoFn<String, String> {
        private PartitionDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) {
            if (((String) processContext.element()).hashCode() % 2 == 0) {
                processContext.output(processContext.element());
            } else {
                processContext.output(NexmarkLauncher.SIDE, processContext.element());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/NexmarkLauncher$PipelineBuilder.class */
    public interface PipelineBuilder<OptionT extends NexmarkOptions> {
        void build(OptionT optiont);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/NexmarkLauncher$StringToTableRow.class */
    public static class StringToTableRow extends DoFn<String, TableRow> {
        private StringToTableRow() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, TableRow>.ProcessContext processContext) {
            int nextInt = ThreadLocalRandom.current().nextInt(10);
            ArrayList arrayList = new ArrayList(nextInt);
            for (int i = 0; i < nextInt; i++) {
                arrayList.add(new TableRow().set("index", Integer.valueOf(i)).set("value", Integer.toString(i)));
            }
            processContext.output(new TableRow().set("result", processContext.element()).set("records", arrayList));
        }
    }

    public NexmarkLauncher(OptionT optiont) {
        this.options = optiont;
    }

    private boolean isStreaming() {
        return this.options.isStreaming();
    }

    private int maxNumWorkers() {
        return 5;
    }

    private long getCounterMetric(PipelineResult pipelineResult, String str, String str2, long j) {
        try {
            return ((Long) ((MetricResult) pipelineResult.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(str, str2)).build()).counters().iterator().next()).attempted()).longValue();
        } catch (NoSuchElementException e) {
            LOG.error("Failed to get metric {}, from namespace {}", str2, str);
            return j;
        }
    }

    private long getDistributionMetric(PipelineResult pipelineResult, String str, String str2, DistributionType distributionType, long j) {
        try {
            MetricResult metricResult = (MetricResult) pipelineResult.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(str, str2)).build()).distributions().iterator().next();
            switch (distributionType) {
                case MIN:
                    return ((DistributionResult) metricResult.attempted()).min();
                case MAX:
                    return ((DistributionResult) metricResult.attempted()).max();
                default:
                    return j;
            }
        } catch (NoSuchElementException e) {
            LOG.error("Failed to get distribution metric {} for namespace {}", str2, str);
            return j;
        }
    }

    private long getTimestampMetric(long j, long j2) {
        if (Math.abs(j2 - j) > Duration.standardDays(10000L).getMillis()) {
            return -1L;
        }
        return j2;
    }

    private void captureSteadyState(NexmarkPerf nexmarkPerf, List<NexmarkPerf.ProgressSnapshot> list) {
        if (this.options.isStreaming()) {
            int i = 0;
            while (i < list.size() && (list.get(i).numEvents < 0 || list.get(i).numResults < 0)) {
                i++;
            }
            int size = list.size() - 1;
            while (size > i && !list.get(size).anyActivity(list.get(size - 1))) {
                size--;
            }
            int i2 = (size - i) + 1;
            if (i2 < 9) {
                NexmarkUtils.console("%d samples not enough to calculate steady-state event rate", Integer.valueOf(i2));
                return;
            }
            int i3 = i + (i2 / 3);
            int i4 = size - (i2 / 3);
            double d = list.get(i4).secSinceStart - list.get(i3).secSinceStart;
            if (d < MIN_WINDOW.getStandardSeconds()) {
                NexmarkUtils.console("sample of %.1f sec not long enough to calculate steady-state event rate", Double.valueOf(d));
                return;
            }
            double d2 = 0.0d;
            double d3 = 0.0d;
            long j = -1;
            for (int i5 = i3; i5 <= i4; i5++) {
                if (j != list.get(i5).numEvents) {
                    double d4 = list.get(i5).runtimeSec;
                    j = list.get(i5).numEvents;
                    d2 += d4 * d4;
                    d3 += d4 * j;
                }
            }
            double d5 = d3 / d2;
            NexmarkUtils.console("revising events/sec from %.1f to %.1f", Double.valueOf(nexmarkPerf.eventsPerSec), Double.valueOf(d5));
            nexmarkPerf.eventsPerSec = d5;
        }
    }

    private NexmarkPerf currentPerf(long j, long j2, PipelineResult pipelineResult, List<NexmarkPerf.ProgressSnapshot> list, Monitor<?> monitor, Monitor<?> monitor2) {
        NexmarkPerf nexmarkPerf = new NexmarkPerf();
        long counterMetric = getCounterMetric(pipelineResult, monitor.name, monitor.prefix + ".elements", -1L);
        long counterMetric2 = getCounterMetric(pipelineResult, monitor.name, monitor.prefix + ".bytes", -1L);
        long timestampMetric = getTimestampMetric(j2, getDistributionMetric(pipelineResult, monitor.name, monitor.prefix + ".startTime", DistributionType.MIN, -1L));
        long timestampMetric2 = getTimestampMetric(j2, getDistributionMetric(pipelineResult, monitor.name, monitor.prefix + ".endTime", DistributionType.MAX, -1L));
        long counterMetric3 = getCounterMetric(pipelineResult, monitor2.name, monitor2.prefix + ".elements", -1L);
        long counterMetric4 = getCounterMetric(pipelineResult, monitor2.name, monitor2.prefix + ".bytes", -1L);
        long timestampMetric3 = getTimestampMetric(j2, getDistributionMetric(pipelineResult, monitor2.name, monitor2.prefix + ".startTime", DistributionType.MIN, -1L));
        long timestampMetric4 = getTimestampMetric(j2, getDistributionMetric(pipelineResult, monitor2.name, monitor2.prefix + ".endTime", DistributionType.MAX, -1L));
        long timestampMetric5 = getTimestampMetric(j2, getDistributionMetric(pipelineResult, monitor2.name, monitor2.prefix + ".startTimestamp", DistributionType.MIN, -1L));
        long timestampMetric6 = getTimestampMetric(j2, getDistributionMetric(pipelineResult, monitor2.name, monitor2.prefix + ".endTimestamp", DistributionType.MAX, -1L));
        long j3 = -1;
        if (timestampMetric2 >= 0 && timestampMetric4 >= 0) {
            j3 = Math.max(timestampMetric2, timestampMetric4);
        } else if (timestampMetric4 >= 0) {
            j3 = timestampMetric4;
        } else if (timestampMetric2 >= 0) {
            j3 = timestampMetric2;
        }
        if (j3 >= 0 && timestampMetric >= 0 && j3 >= timestampMetric) {
            nexmarkPerf.runtimeSec = (j3 - timestampMetric) / 1000.0d;
        }
        if (counterMetric >= 0) {
            nexmarkPerf.numEvents = counterMetric;
        }
        if (counterMetric >= 0 && nexmarkPerf.runtimeSec > 0.0d) {
            nexmarkPerf.eventsPerSec = counterMetric / nexmarkPerf.runtimeSec;
        }
        if (counterMetric2 >= 0 && nexmarkPerf.runtimeSec > 0.0d) {
            nexmarkPerf.eventBytesPerSec = counterMetric2 / nexmarkPerf.runtimeSec;
        }
        if (counterMetric3 >= 0) {
            nexmarkPerf.numResults = counterMetric3;
        }
        if (counterMetric3 >= 0 && nexmarkPerf.runtimeSec > 0.0d) {
            nexmarkPerf.resultsPerSec = counterMetric3 / nexmarkPerf.runtimeSec;
        }
        if (counterMetric4 >= 0 && nexmarkPerf.runtimeSec > 0.0d) {
            nexmarkPerf.resultBytesPerSec = counterMetric4 / nexmarkPerf.runtimeSec;
        }
        if (timestampMetric >= 0) {
            nexmarkPerf.startupDelaySec = (timestampMetric - j) / 1000.0d;
        }
        if (timestampMetric3 >= 0 && timestampMetric >= 0 && timestampMetric3 >= timestampMetric) {
            nexmarkPerf.processingDelaySec = (timestampMetric3 - timestampMetric) / 1000.0d;
        }
        if (timestampMetric5 >= 0 && timestampMetric6 >= 0 && nexmarkPerf.runtimeSec > 0.0d) {
            nexmarkPerf.timeDilation = ((timestampMetric6 - timestampMetric5) / 1000.0d) / nexmarkPerf.runtimeSec;
        }
        if (timestampMetric4 >= 0) {
            nexmarkPerf.shutdownDelaySec = (j2 - timestampMetric4) / 1000.0d;
        }
        NexmarkPerf.ProgressSnapshot progressSnapshot = new NexmarkPerf.ProgressSnapshot();
        progressSnapshot.secSinceStart = (j2 - j) / 1000.0d;
        progressSnapshot.runtimeSec = nexmarkPerf.runtimeSec;
        progressSnapshot.numEvents = counterMetric;
        progressSnapshot.numResults = counterMetric3;
        list.add(progressSnapshot);
        captureSteadyState(nexmarkPerf, list);
        return nexmarkPerf;
    }

    private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> pipelineBuilder) {
        pipelineBuilder.build(this.options);
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:78:0x0298. Please report as an issue. */
    @Nullable
    private NexmarkPerf monitor(NexmarkQuery nexmarkQuery) {
        if (!this.options.getMonitorJobs()) {
            return null;
        }
        if (this.configuration.debug) {
            NexmarkUtils.console("Waiting for main pipeline to 'finish'", new Object[0]);
        } else {
            NexmarkUtils.console("--debug=false, so job will not self-cancel", new Object[0]);
        }
        PipelineResult pipelineResult = this.mainResult;
        PipelineResult pipelineResult2 = this.publisherResult;
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        long j = -1;
        if (this.options.getRunningTimeMinutes() != null) {
            j = (currentTimeMillis + Duration.standardMinutes(this.options.getRunningTimeMinutes().longValue()).getMillis()) - Duration.standardSeconds(this.configuration.preloadSeconds).getMillis();
        }
        long j2 = -1;
        NexmarkPerf nexmarkPerf = null;
        boolean z = false;
        boolean z2 = false;
        ArrayList arrayList2 = new ArrayList();
        while (true) {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (j >= 0 && currentTimeMillis2 > j && !z) {
                NexmarkUtils.console("Reached end of test, cancelling job", new Object[0]);
                try {
                    pipelineResult.cancel();
                    if (this.publisherResult != null) {
                        try {
                            pipelineResult2.cancel();
                            z2 = true;
                        } catch (IOException e) {
                            throw new RuntimeException("Unable to cancel publisher job: ", e);
                        }
                    }
                    z = true;
                } catch (IOException e2) {
                    throw new RuntimeException("Unable to cancel main job: ", e2);
                }
            }
            PipelineResult.State state = pipelineResult.getState();
            Object[] objArr = new Object[3];
            objArr[0] = state;
            objArr[1] = this.queryName;
            objArr[2] = z ? " (waiting for shutdown)" : "";
            NexmarkUtils.console("%s %s%s", objArr);
            NexmarkPerf currentPerf = this.configuration.debug ? currentPerf(currentTimeMillis, currentTimeMillis2, pipelineResult, arrayList, nexmarkQuery.eventMonitor, nexmarkQuery.resultMonitor) : null;
            if (nexmarkPerf == null || nexmarkPerf.anyActivity(currentPerf)) {
                j2 = currentTimeMillis2;
            }
            if (this.options.isStreaming() && !z) {
                Duration duration = new Duration(j2, currentTimeMillis2);
                long counterMetric = getCounterMetric(pipelineResult, nexmarkQuery.getName(), "fatal", 0L);
                if (counterMetric > 0) {
                    NexmarkUtils.console("job has fatal errors, cancelling.", new Object[0]);
                    arrayList2.add(String.format("Pipeline reported %s fatal errors", Long.valueOf(counterMetric)));
                    z = true;
                } else if (this.configuration.debug && this.configuration.numEvents > 0 && currentPerf.numEvents == this.configuration.numEvents && currentPerf.numResults >= 0 && duration.isLongerThan(DONE_DELAY)) {
                    NexmarkUtils.console("streaming query appears to have finished, cancelling job.", new Object[0]);
                    z = true;
                } else if (duration.isLongerThan(STUCK_TERMINATE_DELAY)) {
                    NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.", new Object[0]);
                    arrayList2.add("Streaming job was cancelled since appeared stuck");
                    z = true;
                } else if (duration.isLongerThan(STUCK_WARNING_DELAY)) {
                    NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.", Long.valueOf(duration.getStandardMinutes()));
                    arrayList2.add(String.format("Streaming query was stuck for %d min", Long.valueOf(duration.getStandardMinutes())));
                }
                if (z) {
                    try {
                        pipelineResult.cancel();
                    } catch (IOException e3) {
                        throw new RuntimeException("Unable to cancel main job: ", e3);
                    }
                }
            }
            nexmarkPerf = currentPerf;
            boolean z3 = true;
            switch (AnonymousClass6.$SwitchMap$org$apache$beam$sdk$PipelineResult$State[state.ordinal()]) {
                case 4:
                    z3 = false;
                    break;
                case Ascii.ENQ /* 5 */:
                    z3 = false;
                    if (!z) {
                        arrayList2.add("Job was unexpectedly cancelled");
                        break;
                    }
                    break;
                case Ascii.ACK /* 6 */:
                case Ascii.BEL /* 7 */:
                    z3 = false;
                    arrayList2.add("Job was unexpectedly updated");
                    break;
            }
            if (!z3) {
                nexmarkPerf.errors = arrayList2;
                nexmarkPerf.snapshots = arrayList;
                if (this.publisherResult != null) {
                    NexmarkUtils.console("Shutting down publisher pipeline.", new Object[0]);
                    if (!z2) {
                        try {
                            pipelineResult2.cancel();
                        } catch (IOException e4) {
                            throw new RuntimeException("Unable to cancel publisher job: ", e4);
                        }
                    }
                    pipelineResult2.waitUntilFinish(Duration.standardMinutes(5L));
                }
                return nexmarkPerf;
            }
            if (j2 == currentTimeMillis2) {
                NexmarkUtils.console("new perf %s", nexmarkPerf);
            } else {
                NexmarkUtils.console("no activity", new Object[0]);
            }
            try {
                Thread.sleep(PERF_DELAY.getMillis());
            } catch (InterruptedException e5) {
                Thread.interrupted();
                NexmarkUtils.console("Interrupted: pipeline is still running", new Object[0]);
            }
        }
    }

    private String shortTopic(long j) {
        String pubsubTopic = this.options.getPubsubTopic();
        if (Strings.isNullOrEmpty(pubsubTopic)) {
            throw new RuntimeException("Missing --pubsubTopic");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM:
                return pubsubTopic;
            case QUERY:
                return String.format("%s_%s_source", pubsubTopic, this.queryName);
            case QUERY_AND_SALT:
                return String.format("%s_%s_%d_source", pubsubTopic, this.queryName, Long.valueOf(j));
            default:
                throw new RuntimeException("Unrecognized enum " + this.options.getResourceNameMode());
        }
    }

    private String shortSubscription(long j) {
        String pubsubSubscription = this.options.getPubsubSubscription();
        if (Strings.isNullOrEmpty(pubsubSubscription)) {
            throw new RuntimeException("Missing --pubsubSubscription");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM:
                return pubsubSubscription;
            case QUERY:
                return String.format("%s_%s_source", pubsubSubscription, this.queryName);
            case QUERY_AND_SALT:
                return String.format("%s_%s_%d_source", pubsubSubscription, this.queryName, Long.valueOf(j));
            default:
                throw new RuntimeException("Unrecognized enum " + this.options.getResourceNameMode());
        }
    }

    private String textFilename(long j) {
        String outputPath = this.options.getOutputPath();
        if (Strings.isNullOrEmpty(outputPath)) {
            throw new RuntimeException("Missing --outputPath");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM:
                return outputPath;
            case QUERY:
                return String.format("%s/nexmark_%s.txt", outputPath, this.queryName);
            case QUERY_AND_SALT:
                return String.format("%s/nexmark_%s_%d.txt", outputPath, this.queryName, Long.valueOf(j));
            default:
                throw new RuntimeException("Unrecognized enum " + this.options.getResourceNameMode());
        }
    }

    private String tableSpec(long j, String str) {
        String bigQueryTable = this.options.getBigQueryTable();
        if (Strings.isNullOrEmpty(bigQueryTable)) {
            throw new RuntimeException("Missing --bigQueryTable");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM:
                return String.format("%s:nexmark.%s_%s", this.options.getProject(), bigQueryTable, str);
            case QUERY:
                return String.format("%s:nexmark.%s_%s_%s", this.options.getProject(), bigQueryTable, this.queryName, str);
            case QUERY_AND_SALT:
                return String.format("%s:nexmark.%s_%s_%s_%d", this.options.getProject(), bigQueryTable, this.queryName, str, Long.valueOf(j));
            default:
                throw new RuntimeException("Unrecognized enum " + this.options.getResourceNameMode());
        }
    }

    private String logsDir(long j) {
        String outputPath = this.options.getOutputPath();
        if (Strings.isNullOrEmpty(outputPath)) {
            throw new RuntimeException("Missing --outputPath");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM:
                return outputPath;
            case QUERY:
                return String.format("%s/logs_%s", outputPath, this.queryName);
            case QUERY_AND_SALT:
                return String.format("%s/logs_%s_%d", outputPath, this.queryName, Long.valueOf(j));
            default:
                throw new RuntimeException("Unrecognized enum " + this.options.getResourceNameMode());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PCollection<Event> sourceEventsFromSynthetic(Pipeline pipeline) {
        if (isStreaming()) {
            NexmarkUtils.console("Generating %d events in streaming mode", Long.valueOf(this.configuration.numEvents));
            return pipeline.apply(this.queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(this.configuration));
        }
        NexmarkUtils.console("Generating %d events in batch mode", Long.valueOf(this.configuration.numEvents));
        return pipeline.apply(this.queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(this.configuration));
    }

    private PCollection<Event> sourceEventsFromPubsub(Pipeline pipeline, long j) {
        String shortSubscription = shortSubscription(j);
        NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
        PubsubIO.Read withIdAttribute = PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription).withIdAttribute(NexmarkUtils.PUBSUB_ID);
        if (!this.configuration.usePubsubPublishTime) {
            withIdAttribute = withIdAttribute.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
        }
        return pipeline.apply(this.queryName + ".ReadPubsubEvents", withIdAttribute).apply(this.queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() { // from class: org.apache.beam.sdk.nexmark.NexmarkLauncher.1
            @DoFn.ProcessElement
            public void processElement(DoFn<PubsubMessage, Event>.ProcessContext processContext) {
                try {
                    processContext.output((Event) CoderUtils.decodeFromByteArray(Event.CODER, ((PubsubMessage) processContext.element()).getPayload()));
                } catch (CoderException e) {
                    NexmarkLauncher.LOG.error("Error while decoding Event from pusbSub message: serialization error");
                }
            }
        }));
    }

    private PCollection<Event> sourceEventsFromAvro(Pipeline pipeline) {
        String inputPath = this.options.getInputPath();
        if (Strings.isNullOrEmpty(inputPath)) {
            throw new RuntimeException("Missing --inputPath");
        }
        NexmarkUtils.console("Reading events from Avro files at %s", inputPath);
        return pipeline.apply(this.queryName + ".ReadAvroEvents", AvroIO.read(Event.class).from(inputPath + "*.avro")).apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sinkEventsToPubsub(PCollection<Event> pCollection, long j) {
        String shortTopic = shortTopic(j);
        NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
        PubsubIO.Write withIdAttribute = PubsubIO.writeMessages().to(shortTopic).withIdAttribute(NexmarkUtils.PUBSUB_ID);
        if (!this.configuration.usePubsubPublishTime) {
            withIdAttribute = withIdAttribute.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
        }
        pCollection.apply(this.queryName + ".EventToPubsubMessage", ParDo.of(new DoFn<Event, PubsubMessage>() { // from class: org.apache.beam.sdk.nexmark.NexmarkLauncher.2
            @DoFn.ProcessElement
            public void processElement(DoFn<Event, PubsubMessage>.ProcessContext processContext) {
                try {
                    processContext.output(new PubsubMessage(CoderUtils.encodeToByteArray(Event.CODER, processContext.element()), new HashMap()));
                } catch (CoderException e) {
                    NexmarkLauncher.LOG.error("Error while sending Event {} to pusbSub: serialization error", ((Event) processContext.element()).toString());
                }
            }
        })).apply(this.queryName + ".WritePubsubEvents", withIdAttribute);
    }

    private void sinkResultsToPubsub(PCollection<String> pCollection, long j) {
        String shortTopic = shortTopic(j);
        NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
        PubsubIO.Write withIdAttribute = PubsubIO.writeStrings().to(shortTopic).withIdAttribute(NexmarkUtils.PUBSUB_ID);
        if (!this.configuration.usePubsubPublishTime) {
            withIdAttribute = withIdAttribute.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
        }
        pCollection.apply(this.queryName + ".WritePubsubResults", withIdAttribute);
    }

    private void sinkEventsToAvro(PCollection<Event> pCollection) {
        String outputPath = this.options.getOutputPath();
        if (Strings.isNullOrEmpty(outputPath)) {
            throw new RuntimeException("Missing --outputPath");
        }
        NexmarkUtils.console("Writing events to Avro files at %s", outputPath);
        pCollection.apply(this.queryName + ".WriteAvroEvents", AvroIO.write(Event.class).to(outputPath + "/event").withSuffix(".avro"));
        pCollection.apply(NexmarkQuery.JUST_BIDS).apply(this.queryName + ".WriteAvroBids", AvroIO.write(Bid.class).to(outputPath + "/bid").withSuffix(".avro"));
        pCollection.apply(NexmarkQuery.JUST_NEW_AUCTIONS).apply(this.queryName + ".WriteAvroAuctions", AvroIO.write(Auction.class).to(outputPath + "/auction").withSuffix(".avro"));
        pCollection.apply(NexmarkQuery.JUST_NEW_PERSONS).apply(this.queryName + ".WriteAvroPeople", AvroIO.write(Person.class).to(outputPath + "/person").withSuffix(".avro"));
    }

    private void sinkResultsToText(PCollection<String> pCollection, long j) {
        String textFilename = textFilename(j);
        NexmarkUtils.console("Writing results to text files at %s", textFilename);
        pCollection.apply(this.queryName + ".WriteTextResults", TextIO.write().to(textFilename));
    }

    private void sinkResultsToBigQuery(PCollection<String> pCollection, long j, String str) {
        String tableSpec = tableSpec(j, str);
        TableSchema fields = new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("result").setType("STRING"), new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD").setFields(ImmutableList.of(new TableFieldSchema().setName("index").setType("INTEGER"), new TableFieldSchema().setName("value").setType("STRING")))));
        NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
        pCollection.apply(this.queryName + ".StringToTableRow", ParDo.of(new StringToTableRow())).apply(this.queryName + ".WriteBigQueryResults", BigQueryIO.write().to(tableSpec).withSchema(fields).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    }

    private PCollection<Event> createSource(Pipeline pipeline, final long j) {
        PCollection<Event> pCollection = null;
        switch (this.configuration.sourceType) {
            case DIRECT:
                pCollection = sourceEventsFromSynthetic(pipeline);
                break;
            case AVRO:
                pCollection = sourceEventsFromAvro(pipeline);
                break;
            case PUBSUB:
                switch (this.configuration.pubSubMode) {
                    case PUBLISH_ONLY:
                        sinkEventsToPubsub((PCollection) sourceEventsFromSynthetic(pipeline).apply(this.queryName + ".Snoop", NexmarkUtils.snoop(this.queryName)), j);
                        break;
                    case COMBINED:
                        invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() { // from class: org.apache.beam.sdk.nexmark.NexmarkLauncher.3
                            @Override // org.apache.beam.sdk.nexmark.NexmarkLauncher.PipelineBuilder
                            public void build(NexmarkOptions nexmarkOptions) {
                                Pipeline create = Pipeline.create(NexmarkLauncher.this.options);
                                NexmarkUtils.setupPipeline(NexmarkLauncher.this.configuration.coderStrategy, create);
                                NexmarkLauncher.this.publisherMonitor = new Monitor(NexmarkLauncher.this.queryName, "publisher");
                                NexmarkLauncher.this.sinkEventsToPubsub(NexmarkLauncher.this.sourceEventsFromSynthetic(create).apply(NexmarkLauncher.this.queryName + ".Monitor", NexmarkLauncher.this.publisherMonitor.getTransform()), j);
                                NexmarkLauncher.this.publisherResult = create.run();
                            }
                        });
                        break;
                }
                switch (this.configuration.pubSubMode) {
                    case SUBSCRIBE_ONLY:
                    case COMBINED:
                        pCollection = sourceEventsFromPubsub(pipeline, j);
                        break;
                }
        }
        return pCollection;
    }

    private void sink(PCollection<TimestampedValue<KnownSize>> pCollection, long j) {
        if (this.configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
            pCollection.apply(this.queryName + ".DevNull", NexmarkUtils.devNull(this.queryName));
            return;
        }
        PCollection<String> pCollection2 = (PCollection) pCollection.apply(this.queryName + ".Format", NexmarkUtils.format(this.queryName));
        if (this.options.getLogResults()) {
            pCollection2 = (PCollection) pCollection2.apply(this.queryName + ".Results.Log", NexmarkUtils.log(this.queryName + ".Results"));
        }
        switch (AnonymousClass6.$SwitchMap$org$apache$beam$sdk$nexmark$NexmarkUtils$SinkType[this.configuration.sinkType.ordinal()]) {
            case 1:
                pCollection2.apply(this.queryName + ".DevNull", NexmarkUtils.devNull(this.queryName));
                return;
            case 2:
                sinkResultsToPubsub(pCollection2, j);
                return;
            case 3:
                sinkResultsToText(pCollection2, j);
                return;
            case 4:
                NexmarkUtils.console("WARNING: with --sinkType=AVRO, actual query results will be discarded.", new Object[0]);
                return;
            case Ascii.ENQ /* 5 */:
                PCollectionTuple apply = pCollection2.apply(this.queryName + ".Partition", ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
                sinkResultsToBigQuery(apply.get(MAIN), j, "main");
                sinkResultsToBigQuery(apply.get(SIDE), j, "side");
                sinkResultsToBigQuery(pCollection2, j, "copy");
                return;
            case Ascii.ACK /* 6 */:
                throw new RuntimeException();
            default:
                return;
        }
    }

    private void modelResultRates(NexmarkQueryModel nexmarkQueryModel) {
        ArrayList newArrayList = Lists.newArrayList(nexmarkQueryModel.simulator().resultsPerWindow());
        Collections.sort(newArrayList);
        int size = newArrayList.size();
        if (size < 5) {
            NexmarkUtils.console("Query%d: only %d samples", Integer.valueOf(nexmarkQueryModel.configuration.query), Integer.valueOf(size));
        } else {
            NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", Integer.valueOf(nexmarkQueryModel.configuration.query), Integer.valueOf(size), newArrayList.get(0), newArrayList.get(size / 4), newArrayList.get(size / 2), newArrayList.get((size - 1) - (size / 4)), newArrayList.get(size - 1));
        }
    }

    @Nullable
    public NexmarkPerf run(NexmarkConfiguration nexmarkConfiguration) {
        if (this.options.getManageResources() && !this.options.getMonitorJobs()) {
            throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
        }
        Preconditions.checkState(this.configuration == null);
        Preconditions.checkState(this.queryName == null);
        this.configuration = nexmarkConfiguration;
        try {
            NexmarkUtils.console("Running %s", this.configuration.toShortString());
            if (this.configuration.numEvents < 0) {
                NexmarkUtils.console("skipping since configuration is disabled", new Object[0]);
                this.configuration = null;
                this.queryName = null;
                return null;
            }
            NexmarkQuery nexmarkQuery = (NexmarkQuery) Arrays.asList(new Query0(this.configuration), new Query1(this.configuration), new Query2(this.configuration), new Query3(this.configuration), new Query4(this.configuration), new Query5(this.configuration), new Query6(this.configuration), new Query7(this.configuration), new Query8(this.configuration), new Query9(this.configuration), new Query10(this.configuration), new Query11(this.configuration), new Query12(this.configuration)).get(this.configuration.query);
            this.queryName = nexmarkQuery.getName();
            NexmarkQueryModel nexmarkQueryModel = (NexmarkQueryModel) Arrays.asList(new Query0Model(this.configuration), new Query1Model(this.configuration), new Query2Model(this.configuration), new Query3Model(this.configuration), new Query4Model(this.configuration), new Query5Model(this.configuration), new Query6Model(this.configuration), new Query7Model(this.configuration), new Query8Model(this.configuration), new Query9Model(this.configuration), null, null, null).get(this.configuration.query);
            if (this.options.getJustModelResultRate()) {
                if (nexmarkQueryModel == null) {
                    throw new RuntimeException(String.format("No model for %s", this.queryName));
                }
                modelResultRates(nexmarkQueryModel);
                this.configuration = null;
                this.queryName = null;
                return null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            Pipeline create = Pipeline.create(this.options);
            NexmarkUtils.setupPipeline(this.configuration.coderStrategy, create);
            PCollection<Event> createSource = createSource(create, currentTimeMillis);
            if (this.options.getLogEvents()) {
                createSource = (PCollection) createSource.apply(this.queryName + ".Events.Log", NexmarkUtils.log(this.queryName + ".Events"));
            }
            if (createSource != null) {
                if (this.configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
                    sinkEventsToAvro(createSource);
                }
                if (this.configuration.query == 10) {
                    String str = null;
                    if (this.options.getOutputPath() != null && !this.options.getOutputPath().isEmpty()) {
                        str = logsDir(currentTimeMillis);
                    }
                    ((Query10) nexmarkQuery).setOutputPath(str);
                    ((Query10) nexmarkQuery).setMaxNumWorkers(maxNumWorkers());
                }
                PCollection<TimestampedValue<KnownSize>> pCollection = (PCollection) createSource.apply(nexmarkQuery);
                if (this.options.getAssertCorrectness()) {
                    if (nexmarkQueryModel == null) {
                        throw new RuntimeException(String.format("No model for %s", this.queryName));
                    }
                    pCollection.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
                    PAssert.that(pCollection).satisfies(nexmarkQueryModel.assertionFor());
                }
                sink(pCollection, currentTimeMillis);
            }
            this.mainResult = create.run();
            this.mainResult.waitUntilFinish(Duration.standardSeconds(this.configuration.streamTimeout));
            NexmarkPerf monitor = monitor(nexmarkQuery);
            this.configuration = null;
            this.queryName = null;
            return monitor;
        } catch (Throwable th) {
            this.configuration = null;
            this.queryName = null;
            throw th;
        }
    }
}
