package org.apache.beam.runners.samza.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/OpAdapter.class */
public class OpAdapter<InT, OutT, K> implements AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>>, WatermarkFunction<OpMessage<OutT>>, ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>, Serializable {
    private static final Logger LOG;
    private final Op<InT, OutT, K> op;
    private transient List<OpMessage<OutT>> outputList;
    private transient CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
    private transient Instant outputWatermark;
    private transient OpEmitter<OutT> emitter;
    private transient Config config;
    private transient Context context;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/OpAdapter$OpEmitterImpl.class */
    private class OpEmitterImpl implements OpEmitter<OutT> {
        private OpEmitterImpl() {
        }

        @Override // org.apache.beam.runners.samza.runtime.OpEmitter
        public void emitElement(WindowedValue<OutT> windowedValue) {
            OpAdapter.this.outputList.add(OpMessage.ofElement(windowedValue));
        }

        @Override // org.apache.beam.runners.samza.runtime.OpEmitter
        public void emitFuture(CompletionStage<Collection<WindowedValue<OutT>>> completionStage) {
            OpAdapter.this.outputFuture = completionStage.thenApply(collection -> {
                return (List) collection.stream().map(OpMessage::ofElement).collect(Collectors.toList());
            });
        }

        @Override // org.apache.beam.runners.samza.runtime.OpEmitter
        public void emitWatermark(Instant instant) {
            OpAdapter.this.outputWatermark = instant;
        }

        @Override // org.apache.beam.runners.samza.runtime.OpEmitter
        public <T> void emitView(String str, WindowedValue<Iterable<T>> windowedValue) {
            OpAdapter.this.outputList.add(OpMessage.ofSideInput(str, windowedValue));
        }
    }

    public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(Op<InT, OutT, K> op) {
        return new OpAdapter(op);
    }

    private OpAdapter(Op<InT, OutT, K> op) {
        this.op = op;
    }

    public final void init(Context context) {
        this.outputList = new ArrayList();
        this.emitter = new OpEmitterImpl();
        this.config = context.getJobContext().getConfig();
        this.context = context;
    }

    public final void schedule(Scheduler<KeyedTimerData<K>> scheduler) {
        if (!$assertionsDisabled && this.context == null) {
            throw new AssertionError();
        }
        this.op.open(this.config, this.context, scheduler, this.emitter);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [java.util.concurrent.CompletionStage] */
    public synchronized CompletionStage<Collection<OpMessage<OutT>>> apply(OpMessage<InT> opMessage) {
        if (!$assertionsDisabled && !this.outputList.isEmpty()) {
            throw new AssertionError();
        }
        try {
            switch (opMessage.getType()) {
                case ELEMENT:
                    this.op.processElement(opMessage.getElement(), this.emitter);
                    break;
                case SIDE_INPUT:
                    this.op.processSideInput(opMessage.getViewId(), opMessage.getViewElements(), this.emitter);
                    break;
                case SIDE_INPUT_WATERMARK:
                    this.op.processSideInputWatermark(opMessage.getSideInputWatermark(), this.emitter);
                    break;
                default:
                    throw new IllegalArgumentException(String.format("Unexpected input type: %s", opMessage.getType()));
            }
            CompletableFuture completedFuture = CompletableFuture.completedFuture(new ArrayList(this.outputList));
            if (this.outputFuture != null) {
                completedFuture = completedFuture.thenCombine((CompletionStage) this.outputFuture, (collection, collection2) -> {
                    collection.addAll(collection2);
                    return collection;
                });
            }
            this.outputList.clear();
            this.outputFuture = null;
            return completedFuture;
        } catch (Exception e) {
            LOG.error("Op {} threw an exception during processing", getClass().getName(), e);
            throw UserCodeException.wrap(e);
        }
    }

    public synchronized Collection<OpMessage<OutT>> processWatermark(long j) {
        if (!$assertionsDisabled && !this.outputList.isEmpty()) {
            throw new AssertionError();
        }
        try {
            this.op.processWatermark(new Instant(j), this.emitter);
            ArrayList arrayList = new ArrayList(this.outputList);
            this.outputList.clear();
            return arrayList;
        } catch (Exception e) {
            LOG.error("Op {} threw an exception during processing watermark", getClass().getName(), e);
            throw UserCodeException.wrap(e);
        }
    }

    public synchronized Long getOutputWatermark() {
        if (this.outputWatermark != null) {
            return Long.valueOf(this.outputWatermark.getMillis());
        }
        return null;
    }

    public synchronized Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long j) {
        if (!$assertionsDisabled && !this.outputList.isEmpty()) {
            throw new AssertionError();
        }
        try {
            this.op.processTimer(keyedTimerData, this.emitter);
            ArrayList arrayList = new ArrayList(this.outputList);
            this.outputList.clear();
            return arrayList;
        } catch (Exception e) {
            LOG.error("Op {} threw an exception during processing timer", getClass().getName(), e);
            throw UserCodeException.wrap(e);
        }
    }

    public void close() {
        this.op.close();
    }

    static {
        $assertionsDisabled = !OpAdapter.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(OpAdapter.class);
    }
}
