package org.apache.beam.runners.flink.translation.functions;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.class */
public class ReferenceCountingFlinkExecutableStageContextFactory implements FlinkExecutableStageContext.Factory {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ReferenceCountingFlinkExecutableStageContextFactory.class);
    private static final int MAX_RETRY = 3;
    private final Creator creator;
    private volatile transient ScheduledExecutorService executor;
    private volatile transient ConcurrentHashMap<String, WrappedContext> keyRegistry;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory$Creator.class */
    public interface Creator extends ThrowingFunction<JobInfo, FlinkExecutableStageContext>, Serializable {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.class */
    public class WrappedContext implements FlinkExecutableStageContext {
        private JobInfo jobInfo;
        private AtomicInteger referenceCount = new AtomicInteger(0);

        @VisibleForTesting
        FlinkExecutableStageContext context;

        WrappedContext(JobInfo jobInfo, FlinkExecutableStageContext flinkExecutableStageContext) {
            this.jobInfo = jobInfo;
            this.context = flinkExecutableStageContext;
        }

        @Override // org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext
        public StageBundleFactory getStageBundleFactory(ExecutableStage executableStage) {
            return this.context.getStageBundleFactory(executableStage);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            ReferenceCountingFlinkExecutableStageContextFactory.this.scheduleRelease(this.jobInfo);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeActual() throws Exception {
            this.context.close();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.jobInfo.jobId(), ((WrappedContext) obj).jobInfo.jobId());
        }

        public int hashCode() {
            return Objects.hash(this.jobInfo);
        }

        public String toString() {
            return "ContextWrapper{jobId='" + this.jobInfo + "', referenceCount=" + this.referenceCount + '}';
        }
    }

    public static ReferenceCountingFlinkExecutableStageContextFactory create(Creator creator) {
        return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
    }

    private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator) {
        this.creator = creator;
    }

    @Override // org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext.Factory
    public FlinkExecutableStageContext get(JobInfo jobInfo) {
        for (int i = 0; i < 3; i++) {
            WrappedContext computeIfAbsent = getCache().computeIfAbsent(jobInfo.jobId(), str -> {
                try {
                    return new WrappedContext(jobInfo, this.creator.apply(jobInfo));
                } catch (Exception e) {
                    throw new RuntimeException("Unable to create context for job " + jobInfo.jobId(), e);
                }
            });
            synchronized (computeIfAbsent) {
                if (computeIfAbsent.referenceCount != null) {
                    computeIfAbsent.referenceCount.incrementAndGet();
                    return computeIfAbsent;
                }
            }
        }
        throw new RuntimeException(String.format("Max retry %s exhausted while creating Context for job %s", 3, jobInfo.jobId()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleRelease(JobInfo jobInfo) {
        WrappedContext wrappedContext = getCache().get(jobInfo.jobId());
        Preconditions.checkState(wrappedContext != null, "Releasing context for unknown job: " + jobInfo.jobId());
        int environmentCacheMillis = ((PortablePipelineOptions) PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class)).getEnvironmentCacheMillis();
        if (environmentCacheMillis <= 0) {
            release(wrappedContext);
        } else if (getClass().getClassLoader() == ExecutionEnvironment.class.getClassLoader()) {
            getExecutor().schedule(() -> {
                release(wrappedContext);
            }, environmentCacheMillis, TimeUnit.MILLISECONDS);
        } else {
            LOG.warn("{} is not loaded on parent Flink classloader. Falling back to synchronous environment release for job {}.", getClass(), jobInfo.jobId());
            release(wrappedContext);
        }
    }

    private ConcurrentHashMap<String, WrappedContext> getCache() {
        ConcurrentHashMap<String, WrappedContext> concurrentHashMap;
        if (this.keyRegistry != null) {
            return this.keyRegistry;
        }
        synchronized (this) {
            if (this.keyRegistry == null) {
                this.keyRegistry = new ConcurrentHashMap<>();
            }
            concurrentHashMap = this.keyRegistry;
        }
        return concurrentHashMap;
    }

    private ScheduledExecutorService getExecutor() {
        ScheduledExecutorService scheduledExecutorService;
        if (this.executor != null) {
            return this.executor;
        }
        synchronized (this) {
            if (this.executor == null) {
                this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
            }
            scheduledExecutorService = this.executor;
        }
        return scheduledExecutorService;
    }

    @VisibleForTesting
    void release(FlinkExecutableStageContext flinkExecutableStageContext) {
        WrappedContext wrappedContext = (WrappedContext) flinkExecutableStageContext;
        synchronized (wrappedContext) {
            if (wrappedContext.referenceCount.decrementAndGet() == 0) {
                wrappedContext.referenceCount = null;
                if (getCache().remove(wrappedContext.jobInfo.jobId(), wrappedContext)) {
                    try {
                        wrappedContext.closeActual();
                    } catch (Throwable th) {
                        LOG.error("Unable to close FlinkExecutableStageContext.", th);
                    }
                }
            }
        }
    }
}
