package org.apache.beam.runners.direct.portable;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.RemovalListener;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.ExecutionDriver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.portable.DirectTransformExecutor;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.util.UserCodeException;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.class */
public final class ExecutorServiceParallelExecutor implements PipelineExecutor, BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExecutorServiceParallelExecutor.class);
    private final int targetParallelism;
    private final ExecutorService executorService;
    private final RootProviderRegistry rootRegistry;
    private final TransformEvaluatorRegistry registry;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;
    private final EvaluationContext evaluationContext;
    private final TransformExecutorFactory executorFactory;
    private final TransformExecutorService parallelExecutorService;
    private AtomicReference<PipelineResult.State> pipelineState = new AtomicReference<>(PipelineResult.State.RUNNING);
    private final LoadingCache<StepAndKey, TransformExecutorService> serialExecutorServices = CacheBuilder.newBuilder().weakValues().removalListener(shutdownExecutorServiceListener()).build(serialTransformExecutorServiceCacheLoader());
    private final QueueMessageReceiver visibleUpdates = new QueueMessageReceiver();

    /* renamed from: org.apache.beam.runners.direct.portable.ExecutorServiceParallelExecutor$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$runners$local$ExecutionDriver$DriverState = new int[ExecutionDriver.DriverState.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$runners$local$ExecutionDriver$DriverState[ExecutionDriver.DriverState.FAILED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$runners$local$ExecutionDriver$DriverState[ExecutionDriver.DriverState.SHUTDOWN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$runners$local$ExecutionDriver$DriverState[ExecutionDriver.DriverState.CONTINUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor$QueueMessageReceiver.class */
    public static class QueueMessageReceiver implements PipelineMessageReceiver {
        private final BlockingQueue<VisibleExecutorUpdate> updates;

        private QueueMessageReceiver() {
            this.updates = new LinkedBlockingQueue();
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver
        public void failed(Exception exc) {
            this.updates.offer(VisibleExecutorUpdate.fromException(exc));
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver
        public void failed(Error error) {
            this.updates.offer(VisibleExecutorUpdate.fromError(error));
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver
        public void cancelled() {
            this.updates.offer(VisibleExecutorUpdate.cancelled());
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver
        public void completed() {
            this.updates.offer(VisibleExecutorUpdate.finished());
        }

        /* JADX INFO: Access modifiers changed from: private */
        @Nullable
        public VisibleExecutorUpdate tryNext(Duration duration) throws InterruptedException {
            return this.updates.poll(duration.getMillis(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor$VisibleExecutorUpdate.class */
    public static class VisibleExecutorUpdate {
        private final Optional<? extends Throwable> thrown;

        @Nullable
        private final PipelineResult.State newState;

        public static VisibleExecutorUpdate fromException(Exception exc) {
            return new VisibleExecutorUpdate(null, exc);
        }

        public static VisibleExecutorUpdate fromError(Error error) {
            return new VisibleExecutorUpdate(PipelineResult.State.FAILED, error);
        }

        public static VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(PipelineResult.State.DONE, null);
        }

        public static VisibleExecutorUpdate cancelled() {
            return new VisibleExecutorUpdate(PipelineResult.State.CANCELLED, null);
        }

        private VisibleExecutorUpdate(PipelineResult.State state, @Nullable Throwable th) {
            this.thrown = Optional.fromNullable(th);
            this.newState = state;
        }

        PipelineResult.State getNewState() {
            return this.newState;
        }
    }

    public static ExecutorServiceParallelExecutor create(int i, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry transformEvaluatorRegistry, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, EvaluationContext evaluationContext) {
        return new ExecutorServiceParallelExecutor(i, rootProviderRegistry, transformEvaluatorRegistry, executableGraph, evaluationContext);
    }

    private ExecutorServiceParallelExecutor(int i, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry transformEvaluatorRegistry, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, EvaluationContext evaluationContext) {
        this.targetParallelism = i;
        this.executorService = Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setNameFormat("direct-runner-worker").build());
        this.rootRegistry = rootProviderRegistry;
        this.registry = transformEvaluatorRegistry;
        this.graph = executableGraph;
        this.evaluationContext = evaluationContext;
        this.parallelExecutorService = TransformExecutorServices.parallel(this.executorService);
        this.executorFactory = new DirectTransformExecutor.Factory(evaluationContext, transformEvaluatorRegistry);
    }

    private CacheLoader<StepAndKey, TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>() { // from class: org.apache.beam.runners.direct.portable.ExecutorServiceParallelExecutor.1
            @Override // org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheLoader
            public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() {
        return removalNotification -> {
            TransformExecutorService transformExecutorService = (TransformExecutorService) removalNotification.getValue();
            if (transformExecutorService != null) {
                transformExecutorService.shutdown();
            }
        };
    }

    @Override // org.apache.beam.runners.direct.portable.PipelineExecutor
    public void start() {
        int max = Math.max(3, this.targetParallelism);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (PipelineNode.PTransformNode pTransformNode : this.graph.getRootTransforms2()) {
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            try {
                concurrentLinkedQueue.addAll(this.rootRegistry.getInitialInputs(pTransformNode, max));
                builder.put(pTransformNode, concurrentLinkedQueue);
            } catch (Exception e) {
                throw UserCodeException.wrap(e);
            }
        }
        this.evaluationContext.initialize(builder.build());
        final ExecutionDriver create = QuiescenceDriver.create(this.evaluationContext, this.graph, this, this.visibleUpdates, builder.build());
        this.executorService.submit(new Runnable() { // from class: org.apache.beam.runners.direct.portable.ExecutorServiceParallelExecutor.2
            @Override // java.lang.Runnable
            public void run() {
                PipelineResult.State state;
                ExecutionDriver.DriverState drive = create.drive();
                if (!drive.isTermainal()) {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    return;
                }
                PipelineResult.State state2 = PipelineResult.State.UNKNOWN;
                switch (AnonymousClass3.$SwitchMap$org$apache$beam$runners$local$ExecutionDriver$DriverState[drive.ordinal()]) {
                    case 1:
                        state = PipelineResult.State.FAILED;
                        break;
                    case 2:
                        state = PipelineResult.State.DONE;
                        break;
                    case 3:
                        throw new IllegalStateException(String.format("%s should not be a terminal state", ExecutionDriver.DriverState.CONTINUE));
                    default:
                        throw new IllegalArgumentException(String.format("Unknown %s %s", ExecutionDriver.DriverState.class.getSimpleName(), drive));
                }
                ExecutorServiceParallelExecutor.this.shutdownIfNecessary(state);
            }
        });
    }

    @Override // org.apache.beam.runners.direct.portable.BundleProcessor
    public void process(CommittedBundle<?> committedBundle, PipelineNode.PTransformNode pTransformNode, CompletionCallback completionCallback) {
        evaluateBundle(pTransformNode, committedBundle, completionCallback);
    }

    private <T> void evaluateBundle(PipelineNode.PTransformNode pTransformNode, CommittedBundle<T> committedBundle, CompletionCallback completionCallback) {
        TransformExecutorService transformExecutorService;
        if (isKeyed(committedBundle.getPCollection())) {
            transformExecutorService = this.serialExecutorServices.getUnchecked(StepAndKey.of(pTransformNode, committedBundle.getKey()));
        } else {
            transformExecutorService = this.parallelExecutorService;
        }
        TransformExecutor create = this.executorFactory.create(committedBundle, pTransformNode, completionCallback, transformExecutorService);
        if (this.pipelineState.get().isTerminal()) {
            return;
        }
        transformExecutorService.schedule(create);
    }

    private boolean isKeyed(PipelineNode.PCollectionNode pCollectionNode) {
        return this.evaluationContext.isKeyed(pCollectionNode);
    }

    @Override // org.apache.beam.runners.direct.portable.PipelineExecutor
    public PipelineResult.State waitUntilFinish(Duration duration) throws Exception {
        Instant instant = duration.equals(Duration.ZERO) ? new Instant(Long.MAX_VALUE) : Instant.now().plus(duration);
        VisibleExecutorUpdate visibleExecutorUpdate = null;
        while (Instant.now().isBefore(instant) && (visibleExecutorUpdate == null || isTerminalStateUpdate(visibleExecutorUpdate))) {
            visibleExecutorUpdate = this.visibleUpdates.tryNext(Duration.millis(25L));
            if (visibleExecutorUpdate == null && this.pipelineState.get().isTerminal()) {
                return this.pipelineState.get();
            }
            if (visibleExecutorUpdate != null && visibleExecutorUpdate.thrown.isPresent()) {
                Throwable th = (Throwable) visibleExecutorUpdate.thrown.get();
                if (th instanceof Exception) {
                    throw ((Exception) th);
                }
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                throw new Exception("Unknown Type of Throwable", th);
            }
        }
        return this.pipelineState.get();
    }

    @Override // org.apache.beam.runners.direct.portable.PipelineExecutor
    public PipelineResult.State getPipelineState() {
        return this.pipelineState.get();
    }

    private boolean isTerminalStateUpdate(VisibleExecutorUpdate visibleExecutorUpdate) {
        return (visibleExecutorUpdate.getNewState() == null && visibleExecutorUpdate.getNewState().isTerminal()) ? false : true;
    }

    @Override // org.apache.beam.runners.direct.portable.PipelineExecutor
    public void stop() {
        shutdownIfNecessary(PipelineResult.State.CANCELLED);
        this.visibleUpdates.cancelled();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownIfNecessary(PipelineResult.State state) {
        if (state.isTerminal()) {
            LOG.debug("Pipeline has terminated. Shutting down.");
            ArrayList arrayList = new ArrayList();
            try {
                this.serialExecutorServices.invalidateAll();
            } catch (RuntimeException e) {
                arrayList.add(e);
            }
            try {
                this.serialExecutorServices.cleanUp();
            } catch (RuntimeException e2) {
                arrayList.add(e2);
            }
            try {
                this.parallelExecutorService.shutdown();
            } catch (RuntimeException e3) {
                arrayList.add(e3);
            }
            try {
                this.executorService.shutdown();
            } catch (RuntimeException e4) {
                arrayList.add(e4);
            }
            try {
                this.registry.cleanup();
            } catch (Exception e5) {
                arrayList.add(e5);
            }
            this.pipelineState.compareAndSet(PipelineResult.State.RUNNING, state);
            if (arrayList.isEmpty()) {
                return;
            }
            IllegalStateException illegalStateException = new IllegalStateException("Error" + (arrayList.size() == 1 ? "" : "s") + " during executor shutdown:\n" + ((String) arrayList.stream().map((v0) -> {
                return v0.getMessage();
            }).collect(Collectors.joining("\n- ", "- ", ""))));
            this.visibleUpdates.failed(illegalStateException);
            throw illegalStateException;
        }
    }
}
