/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.InputStream;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataInboundObserver<T>
implements Consumer<BeamFnApi.Elements.Data>,
InboundDataClient {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataInboundObserver.class);
    private final FnDataReceiver<T> consumer;
    private final Coder<T> coder;
    private final InboundDataClient readFuture;
    private long byteCounter;
    private long counter;

    public static <T> BeamFnDataInboundObserver<T> forConsumer(Coder<T> coder, FnDataReceiver<T> receiver) {
        return new BeamFnDataInboundObserver<T>(coder, receiver, CompletableFutureInboundDataClient.create());
    }

    public BeamFnDataInboundObserver(Coder<T> coder, FnDataReceiver<T> consumer, InboundDataClient readFuture) {
        this.coder = coder;
        this.consumer = consumer;
        this.readFuture = readFuture;
    }

    @Override
    public void accept(BeamFnApi.Elements.Data t) {
        if (this.readFuture.isDone()) {
            return;
        }
        try {
            if (t.getData().isEmpty()) {
                LOG.debug("Closing stream for instruction {} and transform {} having consumed {} values {} bytes", new Object[]{t.getInstructionId(), t.getTransformId(), this.counter, this.byteCounter});
                this.readFuture.complete();
                return;
            }
            this.byteCounter += (long)t.getData().size();
            InputStream inputStream = t.getData().newInput();
            while (inputStream.available() > 0) {
                ++this.counter;
                Object value = this.coder.decode(inputStream);
                this.consumer.accept(value);
            }
        }
        catch (Exception e) {
            this.readFuture.fail(e);
        }
    }

    @Override
    public void awaitCompletion() throws Exception {
        this.readFuture.awaitCompletion();
    }

    @Override
    public boolean isDone() {
        return this.readFuture.isDone();
    }

    @Override
    public void cancel() {
        this.readFuture.cancel();
    }

    @Override
    public void complete() {
        this.readFuture.complete();
    }

    @Override
    public void fail(Throwable t) {
        this.readFuture.fail(t);
    }
}

