package org.apache.beam.runners.flink.streaming;

import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.class */
public class GroupByNullKeyTest extends AbstractTestBase implements Serializable {
    protected String resultDir;
    protected String resultPath;
    static final String[] EXPECTED_RESULT = {"k: null v: user1 user1 user1 user2 user2 user2 user2 user3"};

    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/GroupByNullKeyTest$ExtractUserAndTimestamp.class */
    private static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
        private ExtractUserAndTimestamp() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<Integer, String>, String>.ProcessContext processContext) {
            KV kv = (KV) processContext.element();
            int intValue = ((Integer) kv.getKey()).intValue();
            String str = (String) kv.getValue();
            if (str != null) {
                processContext.outputWithTimestamp(str, new Instant(intValue));
            }
        }
    }

    @Before
    public void preSubmit() throws Exception {
        File createAndRegisterTempFile = createAndRegisterTempFile("result");
        this.resultDir = createAndRegisterTempFile.toURI().toString();
        this.resultPath = new File(createAndRegisterTempFile, "file.txt").getAbsolutePath();
    }

    @After
    public void postSubmit() throws Exception {
        compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), this.resultDir);
    }

    @Test
    public void testProgram() throws Exception {
        FlinkTestPipeline createForStreaming = FlinkTestPipeline.createForStreaming();
        createForStreaming.getOptions().as(FlinkPipelineOptions.class).setParallelism(1);
        createForStreaming.apply(Create.of(Arrays.asList(KV.of(0, "user1"), KV.of(1, "user1"), KV.of(2, "user1"), KV.of(10, "user2"), KV.of(1, "user2"), KV.of(15000, "user2"), KV.of(12000, "user2"), KV.of(25000, "user3")))).apply(ParDo.of(new ExtractUserAndTimestamp())).apply(Window.into(FixedWindows.of(Duration.standardHours(1L))).triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes()).apply(ParDo.of(new DoFn<String, KV<Void, String>>() { // from class: org.apache.beam.runners.flink.streaming.GroupByNullKeyTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<String, KV<Void, String>>.ProcessContext processContext) throws Exception {
                processContext.output(KV.of((Object) null, (String) processContext.element()));
            }
        })).apply(GroupByKey.create()).apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { // from class: org.apache.beam.runners.flink.streaming.GroupByNullKeyTest.2
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<Void, Iterable<String>>, String>.ProcessContext processContext) throws Exception {
                KV kv = (KV) processContext.element();
                StringBuilder sb = new StringBuilder();
                sb.append("k: " + kv.getKey() + " v:");
                Iterator it = ((Iterable) kv.getValue()).iterator();
                while (it.hasNext()) {
                    sb.append(" " + ((String) it.next()));
                }
                processContext.output(sb.toString());
            }
        })).apply(TextIO.write().to(this.resultPath));
        createForStreaming.run();
    }
}
