package org.apache.beam.examples.common;

import com.google.common.annotations.VisibleForTesting;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

/* loaded from: input_file:org/apache/beam/examples/common/WriteWindowedFilesDoFn.class */
public class WriteWindowedFilesDoFn extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {
    static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
    static final Coder<String> STRING_CODER = StringUtf8Coder.of();
    private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
    private final String output;

    public WriteWindowedFilesDoFn(String str) {
        this.output = str;
    }

    @VisibleForTesting
    public static String fileForWindow(String str, IntervalWindow intervalWindow) {
        return String.format("%s-%s-%s", str, formatter.print(intervalWindow.start()), formatter.print(intervalWindow.end()));
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void>.ProcessContext processContext) throws Exception {
        String fileForWindow = fileForWindow(this.output, (IntervalWindow) ((KV) processContext.element()).getKey());
        OutputStream newOutputStream = Channels.newOutputStream(IOChannelUtils.getFactory(fileForWindow).create(fileForWindow, "text/plain"));
        for (KV kv : (Iterable) ((KV) processContext.element()).getValue()) {
            STRING_CODER.encode(((String) kv.getKey()) + ": " + kv.getValue(), newOutputStream, Coder.Context.OUTER);
            newOutputStream.write(NEWLINE);
        }
        newOutputStream.close();
    }
}
