package org.apache.beam.runners.flink.translation.functions;

import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.class */
public class ImpulseSourceFunction implements SourceFunction<WindowedValue<byte[]>>, CheckpointedFunction {
    private final boolean keepSourceAlive;
    private volatile boolean running = true;
    private transient ListState<Boolean> impulseEmitted;

    public ImpulseSourceFunction(boolean z) {
        this.keepSourceAlive = z;
    }

    public void run(SourceFunction.SourceContext<WindowedValue<byte[]>> sourceContext) throws Exception {
        if (Iterables.isEmpty((Iterable) this.impulseEmitted.get())) {
            synchronized (sourceContext.getCheckpointLock()) {
                sourceContext.collect(WindowedValue.valueInGlobalWindow(new byte[0]));
                this.impulseEmitted.add(true);
            }
        }
        if (this.keepSourceAlive) {
            Object obj = new Object();
            while (this.running) {
                try {
                    synchronized (obj) {
                        obj.wait(1000L);
                    }
                } catch (InterruptedException e) {
                    if (!this.running) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.impulseEmitted = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("impulse-emitted", BooleanSerializer.INSTANCE));
    }
}
