package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.util.List;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.class */
public class TestStreamSource<T> extends RichSourceFunction<WindowedValue<T>> {
    private final SerializableFunction<byte[], TestStream<T>> testStreamDecoder;
    private final byte[] payload;
    private volatile boolean isRunning = true;

    public TestStreamSource(SerializableFunction<byte[], TestStream<T>> serializableFunction, byte[] bArr) {
        this.testStreamDecoder = serializableFunction;
        this.payload = bArr;
    }

    public void run(SourceFunction.SourceContext<WindowedValue<T>> sourceContext) throws CoderException {
        List events = ((TestStream) this.testStreamDecoder.apply(this.payload)).getEvents();
        for (int i = 0; this.isRunning && i < events.size(); i++) {
            TestStream.ElementEvent elementEvent = (TestStream.Event) events.get(i);
            synchronized (sourceContext.getCheckpointLock()) {
                if (elementEvent instanceof TestStream.ElementEvent) {
                    for (TimestampedValue timestampedValue : elementEvent.getElements()) {
                        Instant timestamp = timestampedValue.getTimestamp();
                        sourceContext.collectWithTimestamp(WindowedValue.of(timestampedValue.getValue(), timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), timestamp.getMillis());
                    }
                } else {
                    if (!(elementEvent instanceof TestStream.WatermarkEvent)) {
                        if (!(elementEvent instanceof TestStream.ProcessingTimeEvent)) {
                            throw new IllegalStateException("Unknown event type " + elementEvent);
                        }
                        throw new UnsupportedOperationException("Advancing Processing time is not supported by the Flink Runner.");
                    }
                    sourceContext.emitWatermark(new Watermark(((TestStream.WatermarkEvent) elementEvent).getWatermark().getMillis()));
                }
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }
}
