package org.apache.flink.state.api.utils;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/state/api/utils/WaitingSource.class */
public class WaitingSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
    private static final Map<String, OneShotLatch> guards = new HashMap();
    private final SourceFunction<T> source;
    private final TypeInformation<T> returnType;
    private final String guardId = UUID.randomUUID().toString();
    private volatile boolean running;

    public WaitingSource(SourceFunction<T> sourceFunction, TypeInformation<T> typeInformation) {
        this.source = sourceFunction;
        this.returnType = typeInformation;
        guards.put(this.guardId, new OneShotLatch());
        this.running = true;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        if (this.source instanceof RichSourceFunction) {
            this.source.setRuntimeContext(runtimeContext);
        }
    }

    public void open(Configuration configuration) throws Exception {
        if (this.source instanceof RichSourceFunction) {
            this.source.open(configuration);
        }
    }

    public void close() throws Exception {
        if (this.source instanceof RichSourceFunction) {
            this.source.close();
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        OneShotLatch oneShotLatch = guards.get(this.guardId);
        try {
            this.source.run(sourceContext);
            while (this.running) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
        } finally {
            oneShotLatch.trigger();
        }
    }

    public void cancel() {
        this.source.cancel();
        this.running = false;
    }

    public void awaitSource() throws RuntimeException {
        try {
            guards.get(this.guardId).await();
        } catch (InterruptedException e) {
            throw new RuntimeException("Failed to initialize source");
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.returnType;
    }
}
