/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.utils;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class WaitingSource<T>
extends RichSourceFunction<T>
implements ResultTypeQueryable<T> {
    private static final Map<String, OneShotLatch> guards = new HashMap<String, OneShotLatch>();
    private final SourceFunction<T> source;
    private final TypeInformation<T> returnType;
    private final String guardId;
    private volatile boolean running;

    public WaitingSource(SourceFunction<T> source, TypeInformation<T> returnType) {
        this.source = source;
        this.returnType = returnType;
        this.guardId = UUID.randomUUID().toString();
        guards.put(this.guardId, new OneShotLatch());
        this.running = true;
    }

    public void setRuntimeContext(RuntimeContext t) {
        if (this.source instanceof RichSourceFunction) {
            ((RichSourceFunction)this.source).setRuntimeContext(t);
        }
    }

    public void open(Configuration parameters) throws Exception {
        if (this.source instanceof RichSourceFunction) {
            ((RichSourceFunction)this.source).open(parameters);
        }
    }

    public void close() throws Exception {
        if (this.source instanceof RichSourceFunction) {
            ((RichSourceFunction)this.source).close();
        }
    }

    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        OneShotLatch latch = guards.get(this.guardId);
        try {
            this.source.run(ctx);
        }
        finally {
            latch.trigger();
        }
        while (this.running) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void cancel() {
        this.source.cancel();
        this.running = false;
    }

    public void awaitSource() throws RuntimeException {
        try {
            guards.get(this.guardId).await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to initialize source");
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.returnType;
    }
}

