package org.apache.beam.sdk.nexmark.queries;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.Done;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/Query10.class */
public class Query10 extends NexmarkQueryTransform<Done> {
    private static final int NUM_SHARDS_PER_WORKER = 5;
    private final NexmarkConfiguration configuration;

    @Nullable
    private String outputPath;
    private int maxNumWorkers;
    private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
    private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.nexmark.queries.Query10$5, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/Query10$5.class */
    public static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing = new int[PaneInfo.Timing.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.EARLY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.ON_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.LATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.UNKNOWN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/Query10$OutputFile.class */
    public static class OutputFile implements Serializable {
        private final Instant maxTimestamp;
        private final String shard;
        private final long index;
        private final PaneInfo.Timing timing;

        @Nullable
        private final String filename;

        public OutputFile(Instant instant, String str, long j, PaneInfo.Timing timing, @Nullable String str2) {
            this.maxTimestamp = instant;
            this.shard = str;
            this.index = j;
            this.timing = timing;
            this.filename = str2;
        }

        public String toString() {
            return String.format("%s %s %d %s %s%n", this.maxTimestamp, this.shard, Long.valueOf(this.index), this.timing, this.filename);
        }
    }

    public Query10(NexmarkConfiguration nexmarkConfiguration) {
        super("Query10");
        this.configuration = nexmarkConfiguration;
    }

    public void setOutputPath(@Nullable String str) {
        this.outputPath = str;
    }

    public void setMaxNumWorkers(int i) {
        this.maxNumWorkers = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WritableByteChannel openWritableGcsFile(GcsOptions gcsOptions, String str) throws IOException {
        throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
    }

    private String timingToString(PaneInfo.Timing timing) {
        switch (AnonymousClass5.$SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[timing.ordinal()]) {
            case 1:
                return "E";
            case 2:
                return "O";
            case 3:
                return "L";
            case 4:
                return "U";
            default:
                throw new RuntimeException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OutputFile outputFileFor(BoundedWindow boundedWindow, String str, PaneInfo paneInfo) {
        return new OutputFile(boundedWindow.maxTimestamp(), str, paneInfo.getIndex(), paneInfo.getTiming(), this.outputPath == null ? null : String.format("%s/LOG-%s-%s-%03d-%s-%x", this.outputPath, boundedWindow.maxTimestamp(), str, Long.valueOf(paneInfo.getIndex()), timingToString(paneInfo.getTiming()), Long.valueOf(ThreadLocalRandom.current().nextLong())));
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public String indexPathFor(BoundedWindow boundedWindow) {
        if (this.outputPath == null) {
            return null;
        }
        return String.format("%s/INDEX-%s", this.outputPath, boundedWindow.maxTimestamp());
    }

    public PCollection<Done> expand(PCollection<Event> pCollection) {
        final int i = this.maxNumWorkers * 5;
        return pCollection.apply(this.name + ".ShardEvents", ParDo.of(new DoFn<Event, KV<String, Event>>() { // from class: org.apache.beam.sdk.nexmark.queries.Query10.1
            private final Counter lateCounter;
            private final Counter onTimeCounter;

            {
                this.lateCounter = Metrics.counter(Query10.this.name, "actuallyLateEvent");
                this.onTimeCounter = Metrics.counter(Query10.this.name, "onTimeCounter");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<Event, KV<String, Event>>.ProcessContext processContext) {
                if (((Event) processContext.element()).hasAnnotation("LATE")) {
                    this.lateCounter.inc();
                    Query10.LOG.info("Observed late: %s", processContext.element());
                } else {
                    this.onTimeCounter.inc();
                }
                processContext.output(KV.of(String.format("shard-%05d-of-%05d", Integer.valueOf((int) Math.abs(((Event) processContext.element()).hashCode() % i)), Integer.valueOf(i)), (Event) processContext.element()));
            }
        })).apply(this.name + ".WindowEvents", Window.into(FixedWindows.of(Duration.standardSeconds(this.configuration.windowSizeSec))).triggering(AfterEach.inOrder(new Trigger[]{Repeatedly.forever(AfterPane.elementCountAtLeast(this.configuration.maxLogEvents)).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterFirst.of(new Trigger.OnceTrigger[]{AfterPane.elementCountAtLeast(this.configuration.maxLogEvents), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_BATCHING_PERIOD)}))})).discardingFiredPanes().withAllowedLateness(Duration.standardDays(1L))).apply(this.name + ".GroupByKey", GroupByKey.create()).apply(this.name + ".CheckForLateEvents", ParDo.of(new DoFn<KV<String, Iterable<Event>>, KV<String, Iterable<Event>>>() { // from class: org.apache.beam.sdk.nexmark.queries.Query10.2
            private final Counter earlyCounter;
            private final Counter onTimeCounter;
            private final Counter lateCounter;
            private final Counter unexpectedLatePaneCounter;
            private final Counter unexpectedOnTimeElementCounter;

            {
                this.earlyCounter = Metrics.counter(Query10.this.name, "earlyShard");
                this.onTimeCounter = Metrics.counter(Query10.this.name, "onTimeShard");
                this.lateCounter = Metrics.counter(Query10.this.name, "lateShard");
                this.unexpectedLatePaneCounter = Metrics.counter(Query10.this.name, "ERROR_unexpectedLatePane");
                this.unexpectedOnTimeElementCounter = Metrics.counter(Query10.this.name, "ERROR_unexpectedOnTimeElement");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Iterable<Event>>, KV<String, Iterable<Event>>>.ProcessContext processContext, BoundedWindow boundedWindow) {
                int i2 = 0;
                int i3 = 0;
                Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                while (it.hasNext()) {
                    if (((Event) it.next()).hasAnnotation("LATE")) {
                        i2++;
                    } else {
                        i3++;
                    }
                }
                String str = (String) ((KV) processContext.element()).getKey();
                Query10.LOG.info(String.format("%s with timestamp %s has %d actually late and %d on-time elements in pane %s for window %s", str, processContext.timestamp(), Integer.valueOf(i2), Integer.valueOf(i3), processContext.pane(), boundedWindow.maxTimestamp()));
                if (processContext.pane().getTiming() == PaneInfo.Timing.LATE) {
                    if (i2 == 0) {
                        Query10.LOG.error("ERROR! No late events in late pane for %s", str);
                        this.unexpectedLatePaneCounter.inc();
                    }
                    if (i3 > 0) {
                        Query10.LOG.error("ERROR! Have %d on-time events in late pane for %s", Integer.valueOf(i3), str);
                        this.unexpectedOnTimeElementCounter.inc();
                    }
                    this.lateCounter.inc();
                } else if (processContext.pane().getTiming() == PaneInfo.Timing.EARLY) {
                    if (i3 + i2 < Query10.this.configuration.maxLogEvents) {
                        Query10.LOG.error("ERROR! Only have %d events in early pane for %s", Integer.valueOf(i3 + i2), str);
                    }
                    this.earlyCounter.inc();
                } else {
                    this.onTimeCounter.inc();
                }
                processContext.output((KV) processContext.element());
            }
        })).apply(this.name + ".UploadEvents", ParDo.of(new DoFn<KV<String, Iterable<Event>>, KV<Void, OutputFile>>() { // from class: org.apache.beam.sdk.nexmark.queries.Query10.3
            private final Counter savedFileCounter;
            private final Counter writtenRecordsCounter;

            {
                this.savedFileCounter = Metrics.counter(Query10.this.name, "savedFile");
                this.writtenRecordsCounter = Metrics.counter(Query10.this.name, "writtenRecords");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Iterable<Event>>, KV<Void, OutputFile>>.ProcessContext processContext, BoundedWindow boundedWindow) throws IOException {
                String str = (String) ((KV) processContext.element()).getKey();
                GcsOptions as = processContext.getPipelineOptions().as(GcsOptions.class);
                OutputFile outputFileFor = Query10.this.outputFileFor(boundedWindow, str, processContext.pane());
                Query10.LOG.info(String.format("Writing %s with record timestamp %s, window timestamp %s, pane %s", str, processContext.timestamp(), boundedWindow.maxTimestamp(), processContext.pane()));
                if (outputFileFor.filename != null) {
                    Query10.LOG.info("Beginning write to '%s'", outputFileFor.filename);
                    int i2 = 0;
                    OutputStream newOutputStream = Channels.newOutputStream(Query10.this.openWritableGcsFile(as, outputFileFor.filename));
                    Throwable th = null;
                    try {
                        try {
                            Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                            while (it.hasNext()) {
                                Event.CODER.encode((Event) it.next(), newOutputStream, Coder.Context.OUTER);
                                this.writtenRecordsCounter.inc();
                                i2++;
                                if (i2 % 10000 == 0) {
                                    Query10.LOG.info("So far written %d records to '%s'", Integer.valueOf(i2), outputFileFor.filename);
                                }
                            }
                            if (newOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        newOutputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    newOutputStream.close();
                                }
                            }
                            Query10.LOG.info("Written all %d records to '%s'", Integer.valueOf(i2), outputFileFor.filename);
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (newOutputStream != null) {
                            if (th != null) {
                                try {
                                    newOutputStream.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                newOutputStream.close();
                            }
                        }
                        throw th3;
                    }
                }
                this.savedFileCounter.inc();
                processContext.output(KV.of((Object) null, outputFileFor));
            }
        })).apply(this.name + ".WindowLogFiles", Window.into(FixedWindows.of(Duration.standardSeconds(this.configuration.windowSizeSec))).triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.standardDays(1L)).discardingFiredPanes()).apply(this.name + ".GroupByKey2", GroupByKey.create()).apply(this.name + ".Index", ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { // from class: org.apache.beam.sdk.nexmark.queries.Query10.4
            private final Counter unexpectedLateCounter;
            private final Counter unexpectedEarlyCounter;
            private final Counter unexpectedIndexCounter;
            private final Counter finalizedCounter;

            {
                this.unexpectedLateCounter = Metrics.counter(Query10.this.name, "ERROR_unexpectedLate");
                this.unexpectedEarlyCounter = Metrics.counter(Query10.this.name, "ERROR_unexpectedEarly");
                this.unexpectedIndexCounter = Metrics.counter(Query10.this.name, "ERROR_unexpectedIndex");
                this.finalizedCounter = Metrics.counter(Query10.this.name, "indexed");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<Void, Iterable<OutputFile>>, Done>.ProcessContext processContext, BoundedWindow boundedWindow) throws IOException {
                if (processContext.pane().getTiming() == PaneInfo.Timing.LATE) {
                    this.unexpectedLateCounter.inc();
                    Query10.LOG.error("ERROR! Unexpected LATE pane: %s", processContext.pane());
                    return;
                }
                if (processContext.pane().getTiming() == PaneInfo.Timing.EARLY) {
                    this.unexpectedEarlyCounter.inc();
                    Query10.LOG.error("ERROR! Unexpected EARLY pane: %s", processContext.pane());
                    return;
                }
                if (processContext.pane().getTiming() == PaneInfo.Timing.ON_TIME && processContext.pane().getIndex() != 0) {
                    this.unexpectedIndexCounter.inc();
                    Query10.LOG.error("ERROR! Unexpected ON_TIME pane index: %s", processContext.pane());
                    return;
                }
                GcsOptions as = processContext.getPipelineOptions().as(GcsOptions.class);
                Query10.LOG.info("Index with record timestamp %s, window timestamp %s, pane %s", new Object[]{processContext.timestamp(), boundedWindow.maxTimestamp(), processContext.pane()});
                String indexPathFor = Query10.this.indexPathFor(boundedWindow);
                if (indexPathFor != null) {
                    Query10.LOG.info("Beginning write to '%s'", indexPathFor);
                    int i2 = 0;
                    OutputStream newOutputStream = Channels.newOutputStream(Query10.this.openWritableGcsFile(as, indexPathFor));
                    Throwable th = null;
                    try {
                        Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                        while (it.hasNext()) {
                            newOutputStream.write(((OutputFile) it.next()).toString().getBytes(StandardCharsets.UTF_8));
                            i2++;
                        }
                        Query10.LOG.info("Written all %d lines to '%s'", Integer.valueOf(i2), indexPathFor);
                    } finally {
                        if (newOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    newOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newOutputStream.close();
                            }
                        }
                    }
                }
                processContext.output(new Done("written for timestamp " + boundedWindow.maxTimestamp()));
                this.finalizedCounter.inc();
            }
        }));
    }
}
