/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.AbstractStage;
import com.hazelcast.jet.impl.pipeline.FunctionAdapter;
import com.hazelcast.jet.impl.pipeline.JetEventFunctionAdapter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.SinkImpl;
import com.hazelcast.jet.impl.pipeline.SinkStageImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapStatefulTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapTransform;
import com.hazelcast.jet.impl.pipeline.transform.GlobalFlatMapStatefulTransform;
import com.hazelcast.jet.impl.pipeline.transform.GlobalMapStatefulTransform;
import com.hazelcast.jet.impl.pipeline.transform.HashJoinTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapStatefulTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapTransform;
import com.hazelcast.jet.impl.pipeline.transform.MergeTransform;
import com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.PeekTransform;
import com.hazelcast.jet.impl.pipeline.transform.ProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.SortTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.GeneralStage;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public abstract class ComputeStageImplBase<T>
extends AbstractStage {
    public static final FunctionAdapter ADAPT_TO_JET_EVENT = new JetEventFunctionAdapter();
    public static final int MAX_CONCURRENT_ASYNC_BATCHES = 2;
    static final FunctionAdapter DO_NOT_ADAPT = new FunctionAdapter();
    @Nonnull
    public final FunctionAdapter fnAdapter;
    final boolean isRebalanceOutput;
    final FunctionEx<? super T, ?> rebalanceKeyFn;

    private ComputeStageImplBase(@Nonnull Transform transform, @Nonnull FunctionAdapter fnAdapter, @Nonnull PipelineImpl pipelineImpl, boolean rebalanceOutput, FunctionEx<? super T, ?> rebalanceKeyFn) {
        super(transform, pipelineImpl);
        this.fnAdapter = fnAdapter;
        this.isRebalanceOutput = rebalanceOutput;
        this.rebalanceKeyFn = rebalanceKeyFn;
    }

    ComputeStageImplBase(@Nonnull Transform transform, @Nonnull FunctionAdapter fnAdapter, @Nonnull PipelineImpl pipelineImpl) {
        this(transform, fnAdapter, pipelineImpl, false, null);
    }

    ComputeStageImplBase(ComputeStageImplBase<T> toCopy, boolean rebalanceOutput) {
        this(toCopy.transform, toCopy.fnAdapter, toCopy.pipelineImpl, rebalanceOutput, null);
    }

    ComputeStageImplBase(ComputeStageImplBase<T> toCopy, FunctionEx<? super T, ?> rebalanceKeyFn) {
        this(toCopy.transform, toCopy.fnAdapter, toCopy.pipelineImpl, true, rebalanceKeyFn);
    }

    @Nonnull
    public StreamStage<T> addTimestamps(@Nonnull ToLongFunctionEx<? super T> timestampFn, long allowedLag) {
        Preconditions.checkTrue(this.fnAdapter.equals(DO_NOT_ADAPT), "This stage already has timestamps assigned to it");
        Util.checkSerializable(timestampFn, "timestampFn");
        TimestampTransform<Object> tsTransform = new TimestampTransform<Object>(this.transform, EventTimePolicy.eventTimePolicy(timestampFn, (item, ts) -> JetEvent.jetEvent(ts, item), WatermarkPolicy.limitingLag(allowedLag), 0L, 0L, 60000L, (byte)0));
        this.pipelineImpl.connect(this, tsTransform);
        return new StreamStageImpl(tsTransform, ADAPT_TO_JET_EVENT, this.pipelineImpl);
    }

    @Nonnull
    <RET> RET attachSort(@Nullable ComparatorEx<? super T> comparator) {
        return this.attach(new SortTransform<T>(this.transform, comparator), this.fnAdapter);
    }

    @Nonnull
    <R, RET> RET attachMap(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
        Util.checkSerializable(mapFn, "mapFn");
        return this.attach(new MapTransform("map", this.transform, this.fnAdapter.adaptMapFn(mapFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachFilter(@Nonnull PredicateEx<T> filterFn) {
        Util.checkSerializable(filterFn, "filterFn");
        PredicateEx<?> adaptedFn = this.fnAdapter.adaptFilterFn(filterFn);
        return this.attach(new MapTransform<Object, Object>("filter", this.transform, t -> adaptedFn.test(t) ? t : null), this.fnAdapter);
    }

    @Nonnull
    <R, RET> RET attachFlatMap(@Nonnull FunctionEx<? super T, ? extends Traverser<R>> flatMapFn) {
        Util.checkSerializable(flatMapFn, "flatMapFn");
        return this.attach(new FlatMapTransform("flat-map", this.transform, this.fnAdapter.adaptFlatMapFn(flatMapFn)), this.fnAdapter);
    }

    @Nonnull
    <S, R, RET> RET attachGlobalMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        Util.checkSerializable(createFn, "createFn");
        Util.checkSerializable(mapFn, "mapFn");
        GlobalMapStatefulTransform mapStatefulTransform = new GlobalMapStatefulTransform(this.transform, this.fnAdapter.adaptTimestampFn(), createFn, this.fnAdapter.adaptStatefulMapFn((s2, k, t) -> mapFn.apply((Object)s2, (Object)t)));
        return this.attach(mapStatefulTransform, this.fnAdapter);
    }

    @Nonnull
    <S, R, RET> RET attachGlobalFlatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        Util.checkSerializable(createFn, "createFn");
        Util.checkSerializable(flatMapFn, "flatMapFn");
        GlobalFlatMapStatefulTransform mapStatefulTransform = new GlobalFlatMapStatefulTransform(this.transform, this.fnAdapter.adaptTimestampFn(), createFn, this.fnAdapter.adaptStatefulFlatMapFn((s2, k, t) -> (Traverser)flatMapFn.apply((Object)s2, (Object)t)));
        return this.attach(mapStatefulTransform, this.fnAdapter);
    }

    @Nonnull
    <K, S, R, RET> RET attachMapStateful(long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) {
        Util.checkSerializable(keyFn, "keyFn");
        Util.checkSerializable(createFn, "createFn");
        Util.checkSerializable(mapFn, "mapFn");
        if (ttl > 0L && this.fnAdapter == DO_NOT_ADAPT) {
            throw new IllegalStateException("Cannot use time-to-live on a non-timestamped stream");
        }
        MapStatefulTransform mapStatefulTransform = new MapStatefulTransform(this.transform, ttl, this.fnAdapter.adaptKeyFn(keyFn), this.fnAdapter.adaptTimestampFn(), createFn, this.fnAdapter.adaptStatefulMapFn(mapFn), onEvictFn != null ? this.fnAdapter.adaptOnEvictFn(onEvictFn) : null);
        return this.attach(mapStatefulTransform, this.fnAdapter);
    }

    @Nonnull
    <K, S, R, RET> RET attachFlatMapStateful(long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) {
        Util.checkSerializable(keyFn, "keyFn");
        Util.checkSerializable(createFn, "createFn");
        Util.checkSerializable(flatMapFn, "mapFn");
        if (ttl > 0L && this.fnAdapter == DO_NOT_ADAPT) {
            throw new IllegalStateException("Cannot use time-to-live on a non-timestamped stream");
        }
        FlatMapStatefulTransform flatMapStatefulTransform = new FlatMapStatefulTransform(this.transform, ttl, this.fnAdapter.adaptKeyFn(keyFn), this.fnAdapter.adaptTimestampFn(), createFn, this.fnAdapter.adaptStatefulFlatMapFn(flatMapFn), onEvictFn != null ? this.fnAdapter.adaptOnEvictFlatMapFn(onEvictFn) : null);
        return this.attach(flatMapStatefulTransform, this.fnAdapter);
    }

    @Nonnull
    <S, R, RET> RET attachMapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        Util.checkSerializable(mapFn, "mapFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<? super S, ?, ?> adaptedMapFn = this.fnAdapter.adaptMapUsingServiceFn(mapFn);
        return this.attach(ProcessorTransform.mapUsingServiceTransform(this.transform, serviceFactory, adaptedMapFn), this.fnAdapter);
    }

    @Nonnull
    <S, RET> RET attachFilterUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) {
        Util.checkSerializable(filterFn, "filterFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiPredicateEx<? super S, ?> adaptedFilterFn = this.fnAdapter.adaptFilterUsingServiceFn(filterFn);
        return this.attach(ProcessorTransform.filterUsingServiceTransform(this.transform, serviceFactory, adaptedFilterFn), this.fnAdapter);
    }

    @Nonnull
    <S, R, RET> RET attachFlatMapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        Util.checkSerializable(flatMapFn, "flatMapFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<? super S, ?, Traverser<?>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingServiceFn(flatMapFn);
        return this.attach(ProcessorTransform.flatMapUsingServiceTransform(this.transform, serviceFactory, adaptedFlatMapFn), this.fnAdapter);
    }

    @Nonnull
    <S, R, RET> RET attachMapUsingServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn) {
        Util.checkSerializable(flatMapAsyncFn, "mapAsyncFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<? super S, ?, CompletableFuture<Traverser<?>>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingServiceAsyncFn(flatMapAsyncFn);
        ProcessorTransform processorTransform = ProcessorTransform.flatMapUsingServiceAsyncTransform(this.transform, "map", serviceFactory, maxConcurrentOps, preserveOrder, adaptedFlatMapFn);
        return this.attach(processorTransform, this.fnAdapter);
    }

    @Nonnull
    <S, R, RET> RET attachMapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<Traverser<R>>>> flatMapAsyncBatchedFn) {
        Util.checkSerializable(flatMapAsyncBatchedFn, "mapAsyncBatchedFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx adaptedFn = this.fnAdapter.adaptFlatMapUsingServiceAsyncBatchedFn(flatMapAsyncBatchedFn);
        BiFunctionEx<Object, List, CompletableFuture> flattenedFn = (svc, items) -> {
            CompletableFuture f = (CompletableFuture)adaptedFn.apply((Object)svc, (List<?>)items);
            return f.thenApply(res -> Traversers.traverseIterable(res).flatMap(Function.identity()));
        };
        ProcessorTransform processorTransform = ProcessorTransform.flatMapUsingServiceAsyncBatchedTransform(this.transform, "map", serviceFactory, 2, maxBatchSize, flattenedFn);
        return this.attach(processorTransform, this.fnAdapter);
    }

    @Nonnull
    <S, K, R, RET> RET attachMapUsingPartitionedService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        Util.checkSerializable(mapFn, "mapFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<? super S, ?, ?> adaptedMapFn = this.fnAdapter.adaptMapUsingServiceFn(mapFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.mapUsingServicePartitionedTransform(this.transform, serviceFactory, adaptedMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <S, K, RET> RET attachFilterUsingPartitionedService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) {
        Util.checkSerializable(filterFn, "filterFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiPredicateEx<? super S, ?> adaptedFilterFn = this.fnAdapter.adaptFilterUsingServiceFn(filterFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.filterUsingServicePartitionedTransform(this.transform, serviceFactory, adaptedFilterFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <S, K, R, RET> RET attachFlatMapUsingPartitionedService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        Util.checkSerializable(flatMapFn, "flatMapFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<? super S, ?, Traverser<?>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingServiceFn(flatMapFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.flatMapUsingServicePartitionedTransform(this.transform, serviceFactory, adaptedFlatMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <S, K, R, RET> RET attachMapUsingPartitionedServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<R>> mapAsyncFn) {
        Util.checkSerializable(mapAsyncFn, "mapAsyncFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<Object, Object, CompletableFuture> flatMapAsyncFn = (s2, t) -> ((CompletableFuture)mapAsyncFn.apply((Object)s2, (Object)t)).thenApply(Traversers::singleton);
        BiFunctionEx<Object, ?, CompletableFuture<Traverser<?>>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingServiceAsyncFn(flatMapAsyncFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        PartitionedProcessorTransform<?, ? extends K> processorTransform = PartitionedProcessorTransform.flatMapUsingServiceAsyncPartitionedTransform(this.transform, "map", serviceFactory, maxConcurrentOps, preserveOrder, adaptedFlatMapFn, adaptedPartitionKeyFn);
        return this.attach(processorTransform, this.fnAdapter);
    }

    @Nonnull
    <S, K, R, RET> RET attachMapUsingPartitionedServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) {
        Util.checkSerializable(mapAsyncFn, "mapAsyncFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        serviceFactory = this.moveAttachedFilesToPipeline(serviceFactory);
        BiFunctionEx<Object, List, CompletableFuture> flatMapAsyncFn = (s2, items) -> ((CompletableFuture)mapAsyncFn.apply((Object)s2, (Object)items)).thenApply(list -> Util.toList(list, Traversers::singleton));
        BiFunctionEx<Object, List<?>, CompletableFuture<List<Traverser<?>>>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingServiceAsyncBatchedFn(flatMapAsyncFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        BiFunctionEx<Object, List, CompletableFuture> flattenedFn = (svc, items) -> {
            CompletableFuture f = (CompletableFuture)adaptedFlatMapFn.apply(svc, (List<?>)items);
            return f.thenApply(res -> Traversers.traverseIterable(res).flatMap(Function.identity()));
        };
        PartitionedProcessorTransform<?, ? extends K> processorTransform = PartitionedProcessorTransform.flatMapUsingServiceAsyncBatchedPartitionedTransform(this.transform, "map", serviceFactory, 2, maxBatchSize, flattenedFn, adaptedPartitionKeyFn);
        return this.attach(processorTransform, this.fnAdapter);
    }

    @Nonnull
    private <S> ServiceFactory<?, S> moveAttachedFilesToPipeline(@Nonnull ServiceFactory<?, S> serviceFactory) {
        this.pipelineImpl.attachFiles(serviceFactory.attachedFiles());
        return serviceFactory.withoutAttachedFiles();
    }

    @Nonnull
    <RET> RET attachMerge(@Nonnull GeneralStage<? extends T> other) {
        ComputeStageImplBase castOther = (ComputeStageImplBase)((Object)other);
        if (this.fnAdapter != castOther.fnAdapter) {
            throw new IllegalArgumentException("The merged stages must either both have or both not have timestamp definitions");
        }
        MergeTransform transform = new MergeTransform(this.transform, castOther.transform);
        return this.attach(transform, Collections.singletonList(other), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, R, RET> RET attachHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause, @Nonnull BiFunctionEx<T, T1, R> mapToOutputFn) {
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1)), Collections.singletonList(this.fnAdapter.adaptJoinClause(joinClause)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), Collections.singletonList(stage1), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, K2, T2_IN, T2, R, RET> RET attachHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull TriFunction<T, T1, T2, R> mapToOutputFn) {
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1), ComputeStageImplBase.transformOf(stage2)), Arrays.asList(this.fnAdapter.adaptJoinClause(joinClause1), this.fnAdapter.adaptJoinClause(joinClause2)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), Arrays.asList(stage1, stage2), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachPeek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T, ? extends CharSequence> toStringFn) {
        Util.checkSerializable(shouldLogFn, "shouldLogFn");
        Util.checkSerializable(toStringFn, "toStringFn");
        if (this.isRebalanceOutput) {
            throw new JetException("peek() not supported after rebalance()");
        }
        return this.attach(new PeekTransform(this.transform, this.fnAdapter.adaptFilterFn(shouldLogFn), this.fnAdapter.adaptToStringFn(toStringFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachCustomTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier) {
        return this.attach(ProcessorTransform.customProcessorTransform(stageName, this.transform, procSupplier), this.fnAdapter);
    }

    @Nonnull
    <K, RET> RET attachPartitionedCustomTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn) {
        FunctionEx<?, ? extends K> adaptedKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.partitionedCustomProcessorTransform(stageName, this.transform, procSupplier, adaptedKeyFn), this.fnAdapter);
    }

    @Nonnull
    public SinkStage writeTo(@Nonnull Sink<? super T> sink2) {
        SinkImpl sinkImpl = (SinkImpl)sink2;
        SinkTransform sinkTransform = new SinkTransform(sinkImpl, this.transform, this.fnAdapter == ADAPT_TO_JET_EVENT);
        SinkStageImpl output = new SinkStageImpl(sinkTransform, this.pipelineImpl);
        sinkImpl.onAssignToStage();
        this.pipelineImpl.connect(this, sinkTransform);
        return output;
    }

    @Nonnull
    final <RET> RET attach(@Nonnull AbstractTransform transform, @Nonnull List<? extends GeneralStage<?>> moreInputStages, @Nonnull FunctionAdapter fnAdapter) {
        this.pipelineImpl.connect(this, moreInputStages, transform);
        return this.newStage(transform, fnAdapter);
    }

    @Nonnull
    final <RET> RET attach(@Nonnull AbstractTransform transform, @Nonnull FunctionAdapter fnAdapter) {
        return this.attach(transform, Collections.emptyList(), fnAdapter);
    }

    abstract <RET> RET newStage(@Nonnull AbstractTransform var1, @Nonnull FunctionAdapter var2);

    static void ensureJetEvents(@Nonnull ComputeStageImplBase stage, @Nonnull String name) {
        if (stage.fnAdapter != ADAPT_TO_JET_EVENT) {
            throw new IllegalStateException(name + " is missing a timestamp definition. Call one of the .addTimestamps() methods on it before performing the aggregation.");
        }
    }
}

