package org.apache.beam.runners.flink.streaming;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Joiner;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.class */
public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
    protected String resultPath;
    static final String[] EXPECTED_RESULT = {"user: user1 value:3", "user: user1 value:1", "user: user2 value:4", "user: user2 value:6", "user: user3 value:7", "user: user3 value:2"};

    protected void preSubmit() throws Exception {
        this.resultPath = getTempDirPath("result");
    }

    protected void postSubmit() throws Exception {
        compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), this.resultPath);
    }

    protected void testProgram() throws Exception {
        FlinkTestPipeline createForStreaming = FlinkTestPipeline.createForStreaming();
        Long valueOf = Long.valueOf((System.currentTimeMillis() + 10000) / 1000);
        createForStreaming.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", valueOf).set("contributor_username", "user1"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 10)).set("contributor_username", "user3"), new TableRow().set("timestamp", valueOf).set("contributor_username", "user2"), new TableRow().set("timestamp", valueOf).set("contributor_username", "user1"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 2)).set("contributor_username", "user1"), new TableRow().set("timestamp", valueOf).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 1)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 5)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 7)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 8)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 200)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 230)).set("contributor_username", "user1"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 230)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 240)).set("contributor_username", "user2"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 245)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 235)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 236)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 237)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 238)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 239)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 240)).set("contributor_username", "user3"), new TableRow().set("timestamp", Long.valueOf(valueOf.longValue() + 241)).set("contributor_username", "user2"), new TableRow().set("timestamp", valueOf).set("contributor_username", "user3")))).apply(ParDo.of(new DoFn<TableRow, String>() { // from class: org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase.1
            @DoFn.ProcessElement
            public void processElement(DoFn<TableRow, String>.ProcessContext processContext) throws Exception {
                TableRow tableRow = (TableRow) processContext.element();
                long intValue = ((Integer) tableRow.get("timestamp")).intValue();
                String str = (String) tableRow.get("contributor_username");
                if (str != null) {
                    processContext.outputWithTimestamp(str, new Instant(intValue * 1000));
                }
            }
        })).apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1L)))).apply(Count.perElement()).apply(ParDo.of(new DoFn<KV<String, Long>, String>() { // from class: org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase.2
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Long>, String>.ProcessContext processContext) throws Exception {
                KV kv = (KV) processContext.element();
                processContext.output("user: " + ((String) kv.getKey()) + " value:" + kv.getValue());
            }
        })).apply(TextIO.Write.to(this.resultPath));
        createForStreaming.run();
    }
}
