package cascading.flow.local.stream.duct;

import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.duct.Fork;
import cascading.tuple.TupleEntry;
import java.lang.ref.WeakReference;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/local/stream/duct/ParallelFork.class */
public class ParallelFork<Outgoing> extends Fork<TupleEntry, Outgoing> {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelFork.class);
    private final ArrayList<LinkedBlockingQueue<Message>> buffers;
    private final ExecutorService executor;
    private final ArrayList<Callable<Throwable>> actions;
    private final ArrayList<Future<Throwable>> futures;
    private WeakReference<Duct> started;
    private WeakReference<Duct> completed;

    /* loaded from: input_file:cascading/flow/local/stream/duct/ParallelFork$CompleteMessage.class */
    static final class CompleteMessage extends Message {
        public CompleteMessage(Duct duct) {
            super(duct);
        }

        @Override // cascading.flow.local.stream.duct.ParallelFork.Message
        public void passOn(Duct duct) {
            duct.complete(this.previous);
        }

        @Override // cascading.flow.local.stream.duct.ParallelFork.Message
        public boolean isTermination() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cascading/flow/local/stream/duct/ParallelFork$Message.class */
    public static abstract class Message {
        protected final Duct previous;

        public Message(Duct duct) {
            this.previous = duct;
        }

        public abstract void passOn(Duct duct);

        public abstract boolean isTermination();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cascading/flow/local/stream/duct/ParallelFork$ReceiveMessage.class */
    public static final class ReceiveMessage extends Message {
        final int ordinal;
        final TupleEntry tuple;

        public ReceiveMessage(Duct duct, int i, TupleEntry tupleEntry) {
            super(duct);
            this.ordinal = i;
            this.tuple = new TupleEntry(tupleEntry);
        }

        @Override // cascading.flow.local.stream.duct.ParallelFork.Message
        public void passOn(Duct duct) {
            duct.receive(this.previous, this.ordinal, this.tuple);
        }

        @Override // cascading.flow.local.stream.duct.ParallelFork.Message
        public boolean isTermination() {
            return false;
        }
    }

    /* loaded from: input_file:cascading/flow/local/stream/duct/ParallelFork$StartMessage.class */
    static final class StartMessage extends Message {
        final CountDownLatch startLatch;

        public StartMessage(Duct duct, CountDownLatch countDownLatch) {
            super(duct);
            this.startLatch = countDownLatch;
        }

        @Override // cascading.flow.local.stream.duct.ParallelFork.Message
        public void passOn(Duct duct) {
            this.startLatch.countDown();
            duct.start(this.previous);
        }

        @Override // cascading.flow.local.stream.duct.ParallelFork.Message
        public boolean isTermination() {
            return false;
        }
    }

    public ParallelFork(Duct[] ductArr) {
        super(ductArr);
        this.started = null;
        this.completed = null;
        this.executor = Executors.newFixedThreadPool(ductArr.length);
        ArrayList<LinkedBlockingQueue<Message>> arrayList = new ArrayList<>(ductArr.length);
        ArrayList<Future<Throwable>> arrayList2 = new ArrayList<>(ductArr.length);
        ArrayList<Callable<Throwable>> arrayList3 = new ArrayList<>(ductArr.length);
        for (final Duct duct : ductArr) {
            final LinkedBlockingQueue<Message> linkedBlockingQueue = new LinkedBlockingQueue<>();
            arrayList.add(linkedBlockingQueue);
            arrayList3.add(new Callable<Throwable>() { // from class: cascading.flow.local.stream.duct.ParallelFork.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Throwable call() throws Exception {
                    Message message;
                    do {
                        try {
                            message = (Message) linkedBlockingQueue.take();
                            message.passOn(duct);
                        } catch (Throwable th) {
                            return th;
                        }
                    } while (!message.isTermination());
                    return null;
                }
            });
        }
        this.buffers = arrayList;
        this.actions = arrayList3;
        this.futures = arrayList2;
    }

    public void initialize() {
        super.initialize();
    }

    private void broadcastMessage(Message message) {
        Iterator<LinkedBlockingQueue<Message>> it = this.buffers.iterator();
        while (it.hasNext()) {
            it.next().offer(message);
        }
    }

    public void start(Duct duct) {
        LOG.debug("StartMessage {} BEGIN", duct);
        synchronized (this) {
            if (this.started != null) {
                LOG.error("ParallelFork already started! former previous={}, new previous={}", this.started.get(), duct);
                return;
            }
            if (this.completed != null) {
                throw new IllegalStateException("cannot start an already completed ParallelFork");
            }
            this.started = new WeakReference<>(duct);
            try {
                Iterator<Callable<Throwable>> it = this.actions.iterator();
                while (it.hasNext()) {
                    this.futures.add(this.executor.submit(it.next()));
                }
                CountDownLatch countDownLatch = new CountDownLatch(this.allNext.length);
                broadcastMessage(new StartMessage(duct, countDownLatch));
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new UndeclaredThrowableException(e);
            }
        }
    }

    public void receive(Duct duct, int i, TupleEntry tupleEntry) {
        broadcastMessage(new ReceiveMessage(duct, i, tupleEntry));
    }

    public void complete(Duct duct) {
        Throwable th;
        synchronized (this) {
            if (this.completed != null) {
                LOG.error("ParallelFork already complete! former previous={} new previous={}", this.completed.get(), duct);
                return;
            }
            this.completed = new WeakReference<>(duct);
            broadcastMessage(new CompleteMessage(duct));
            try {
                Iterator<Future<Throwable>> it = this.futures.iterator();
                while (it.hasNext()) {
                    try {
                        th = it.next().get();
                    } catch (InterruptedException e) {
                        th = e;
                    } catch (ExecutionException e2) {
                        th = e2;
                    }
                    if (th != null) {
                        throw new RuntimeException(th);
                    }
                }
            } finally {
                this.executor.shutdown();
            }
        }
    }
}
