package io.dstream;

import io.dstream.SerializableStreamAssets;
import io.dstream.function.DStreamToStreamAdapterFunction;
import io.dstream.utils.Assert;
import io.dstream.utils.Tuples;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/dstream/AbstractStreamMergingFunction.class */
public abstract class AbstractStreamMergingFunction implements SerializableStreamAssets.SerFunction<Stream<Stream<?>>, Stream<?>> {
    private static final long serialVersionUID = -7336517082191905937L;
    private final SerializableStreamAssets.SerFunction<Stream<?>, Stream<?>> streamPreProcessingFunction;
    private int streamCounter = 1;
    protected List<Tuples.Tuple2<Integer, Object>> checkPointProcedures = new ArrayList();

    public AbstractStreamMergingFunction(SerializableStreamAssets.SerFunction<Stream<?>, Stream<?>> serFunction) {
        this.streamPreProcessingFunction = serFunction;
    }

    @Override // java.util.function.Function
    public Stream<?> apply(Stream<Stream<?>> stream) {
        Assert.notNull(stream, "'streams' must not be null");
        try {
            return doApply((List) stream.map(stream2 -> {
                return this.streamPreProcessingFunction.apply(stream2);
            }).collect(Collectors.toList()));
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalStateException("Failed to combine streams", e);
        }
    }

    protected abstract Stream<?> doApply(List<Stream<?>> list);

    public void addCheckPoint(int i) {
        this.streamCounter += i;
        this.checkPointProcedures.add(Tuples.Tuple2.tuple2(Integer.valueOf(this.streamCounter), null));
    }

    public void addTransformationOrPredicate(String str, Object obj) {
        addTransformationOrPredicate(new DStreamToStreamAdapterFunction(str, obj));
    }

    public void addTransformationOrPredicate(SerializableStreamAssets.SerFunction serFunction) {
        Tuples.Tuple2<Integer, Object> tuple2 = this.checkPointProcedures.get(this.checkPointProcedures.size() - 1);
        this.checkPointProcedures.set(this.checkPointProcedures.size() - 1, Tuples.Tuple2.tuple2(tuple2._1(), ((SerializableStreamAssets.SerFunction) tuple2._2()) == null ? serFunction : serFunction.compose((SerializableStreamAssets.SerFunction) tuple2._2())));
    }
}
