package gobblin.fork;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.WorkUnitState;
import gobblin.records.RecordStreamWithMetadata;
import gobblin.stream.ControlMessage;
import gobblin.stream.RecordEnvelope;
import gobblin.stream.StreamEntity;
import io.reactivex.Flowable;
import io.reactivex.functions.Predicate;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.List;

/* loaded from: input_file:WEB-INF/lib/gobblin-api-0.11.0.jar:gobblin/fork/Forker.class */
public class Forker {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-api-0.11.0.jar:gobblin/fork/Forker$ForkFilter.class */
    public static class ForkFilter<D> implements Predicate<RecordWithForkMap<D>> {
        private final int forkIdx;

        @Override // io.reactivex.functions.Predicate
        public boolean test(RecordWithForkMap<D> recordWithForkMap) {
            return recordWithForkMap.sendToBranch(this.forkIdx);
        }

        @ConstructorProperties({"forkIdx"})
        public ForkFilter(int i) {
            this.forkIdx = i;
        }

        public int getForkIdx() {
            return this.forkIdx;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ForkFilter)) {
                return false;
            }
            ForkFilter forkFilter = (ForkFilter) obj;
            return forkFilter.canEqual(this) && getForkIdx() == forkFilter.getForkIdx();
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ForkFilter;
        }

        public int hashCode() {
            return (1 * 59) + getForkIdx();
        }

        public String toString() {
            return "Forker.ForkFilter(forkIdx=" + getForkIdx() + ")";
        }
    }

    /* loaded from: input_file:WEB-INF/lib/gobblin-api-0.11.0.jar:gobblin/fork/Forker$ForkedStream.class */
    public static class ForkedStream<D, S> {
        private final List<RecordStreamWithMetadata<D, S>> forkedStreams;

        @ConstructorProperties({"forkedStreams"})
        public ForkedStream(List<RecordStreamWithMetadata<D, S>> list) {
            this.forkedStreams = list;
        }

        public List<RecordStreamWithMetadata<D, S>> getForkedStreams() {
            return this.forkedStreams;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ForkedStream)) {
                return false;
            }
            ForkedStream forkedStream = (ForkedStream) obj;
            if (!forkedStream.canEqual(this)) {
                return false;
            }
            List<RecordStreamWithMetadata<D, S>> forkedStreams = getForkedStreams();
            List<RecordStreamWithMetadata<D, S>> forkedStreams2 = forkedStream.getForkedStreams();
            return forkedStreams == null ? forkedStreams2 == null : forkedStreams.equals(forkedStreams2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ForkedStream;
        }

        public int hashCode() {
            List<RecordStreamWithMetadata<D, S>> forkedStreams = getForkedStreams();
            return (1 * 59) + (forkedStreams == null ? 43 : forkedStreams.hashCode());
        }

        public String toString() {
            return "Forker.ForkedStream(forkedStreams=" + getForkedStreams() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-api-0.11.0.jar:gobblin/fork/Forker$RecordWithForkMap.class */
    public static class RecordWithForkMap<D> {
        private final StreamEntity<D> record;
        private final List<Boolean> forkMap;
        private final boolean mustCopy;

        public RecordWithForkMap(RecordEnvelope<D> recordEnvelope, List<Boolean> list) {
            this.record = recordEnvelope;
            this.forkMap = Lists.newArrayList(list);
            this.mustCopy = Forker.mustCopy(list);
        }

        public RecordWithForkMap(ControlMessage<D> controlMessage) {
            this.record = controlMessage;
            this.forkMap = null;
            this.mustCopy = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamEntity<D> getRecordCopyIfNecessary() throws CopyNotSupportedException {
            return this.mustCopy ? this.record.getClone() : this.record;
        }

        public boolean sendToBranch(int i) {
            if (this.record instanceof RecordEnvelope) {
                return this.forkMap.get(i).booleanValue();
            }
            return true;
        }
    }

    public <D, S> ForkedStream<D, S> forkStream(RecordStreamWithMetadata<D, S> recordStreamWithMetadata, ForkOperator<S, D> forkOperator, WorkUnitState workUnitState) throws Exception {
        int branches = forkOperator.getBranches(workUnitState);
        workUnitState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, Integer.valueOf(branches));
        forkOperator.init(workUnitState);
        List<Boolean> forkSchema = forkOperator.forkSchema(workUnitState, recordStreamWithMetadata.getSchema());
        int count = (int) forkSchema.stream().filter(bool -> {
            return bool.booleanValue();
        }).count();
        Preconditions.checkState(forkSchema.size() == branches, String.format("Number of forked schemas [%d] is not equal to number of branches [%d]", Integer.valueOf(forkSchema.size()), Integer.valueOf(branches)));
        Flowable map = recordStreamWithMetadata.getRecordStream().map(streamEntity -> {
            if (streamEntity instanceof RecordEnvelope) {
                RecordEnvelope recordEnvelope = (RecordEnvelope) streamEntity;
                return new RecordWithForkMap(recordEnvelope, forkOperator.forkDataRecord(workUnitState, recordEnvelope.getRecord()));
            }
            if (streamEntity instanceof ControlMessage) {
                return new RecordWithForkMap((ControlMessage) streamEntity);
            }
            throw new IllegalStateException("Expected RecordEnvelope or ControlMessage.");
        });
        if (count > 1) {
            map = map.share();
        }
        ArrayList newArrayList = Lists.newArrayList();
        boolean mustCopy = mustCopy(forkSchema);
        for (int i = 0; i < forkSchema.size(); i++) {
            if (forkSchema.get(i).booleanValue()) {
                newArrayList.add(recordStreamWithMetadata.withRecordStream(map.filter(new ForkFilter(i)).map(obj -> {
                    return ((RecordWithForkMap) obj).getRecordCopyIfNecessary();
                }), mustCopy ? (S) CopyHelper.copy(recordStreamWithMetadata.getSchema()) : recordStreamWithMetadata.getSchema()));
            } else {
                newArrayList.add(null);
            }
        }
        return new ForkedStream<>(newArrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean mustCopy(List<Boolean> list) {
        return list.stream().filter(bool -> {
            return bool.booleanValue();
        }).count() >= 2;
    }
}
