package org.apache.flink.languagebinding.api.java.common.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.languagebinding.api.java.common.PlanBinder;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/languagebinding/api/java/common/streaming/Streamer.class */
public abstract class Streamer implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(Streamer.class);
    private static final int SIGNAL_BUFFER_REQUEST = 0;
    private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
    private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
    private static final int SIGNAL_FINISHED = -1;
    private static final int SIGNAL_ERROR = -2;
    private static final byte SIGNAL_LAST = 32;
    protected ServerSocket server;
    protected Socket socket;
    protected InputStream in;
    protected OutputStream out;
    protected int port;
    protected Sender sender;
    protected Receiver receiver;
    protected final AbstractRichFunction function;
    private final byte[] buffer = new byte[4];
    protected StringBuilder msg = new StringBuilder();

    public Streamer(AbstractRichFunction abstractRichFunction) {
        this.function = abstractRichFunction;
        this.sender = new Sender(abstractRichFunction);
        this.receiver = new Receiver(abstractRichFunction);
    }

    public void open() throws IOException {
        this.server = new ServerSocket(0);
        setupProcess();
    }

    public abstract void setupProcess() throws IOException;

    public void close() throws IOException {
        this.socket.close();
        this.sender.close();
        this.receiver.close();
    }

    private void sendWriteNotification(int i, boolean z) throws IOException {
        byte[] bArr = new byte[5];
        putInt(bArr, 0, i);
        bArr[4] = z ? (byte) 0 : (byte) 32;
        this.out.write(bArr, 0, 5);
        this.out.flush();
    }

    private void sendReadConfirmation() throws IOException {
        this.out.write(new byte[1], 0, 1);
        this.out.flush();
    }

    private void checkForError() {
        if (getInt(this.buffer, 0) == SIGNAL_ERROR) {
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely." + ((Object) this.msg));
        }
    }

    public final void sendBroadCastVariables(Configuration configuration) throws IOException {
        try {
            int integer = configuration.getInteger(PlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT, 0);
            String[] strArr = new String[integer];
            for (int i = 0; i < strArr.length; i++) {
                strArr[i] = configuration.getString(PlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + i, (String) null);
            }
            this.in.read(this.buffer, 0, 4);
            checkForError();
            sendWriteNotification(this.sender.sendRecord(Integer.valueOf(integer)), false);
            for (String str : strArr) {
                Iterator it = this.function.getRuntimeContext().getBroadcastVariable(str).iterator();
                this.in.read(this.buffer, 0, 4);
                checkForError();
                sendWriteNotification(this.sender.sendRecord(str), false);
                while (true) {
                    if (it.hasNext() || this.sender.hasRemaining(0)) {
                        this.in.read(this.buffer, 0, 4);
                        checkForError();
                        sendWriteNotification(this.sender.sendBuffer(it, 0), it.hasNext() || this.sender.hasRemaining(0));
                    }
                }
                this.sender.reset();
            }
        } catch (SocketTimeoutException e) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + ((Object) this.msg));
        }
    }

    public final void streamBufferWithoutGroups(Iterator it, Collector collector) throws IOException {
        try {
            if (it.hasNext()) {
                while (true) {
                    this.in.read(this.buffer, 0, 4);
                    int i = getInt(this.buffer, 0);
                    switch (i) {
                        case SIGNAL_ERROR /* -2 */:
                            try {
                                Thread.sleep(2000L);
                            } catch (InterruptedException e) {
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + ((Object) this.msg));
                        case -1:
                            return;
                        case 0:
                            if (!it.hasNext() && !this.sender.hasRemaining(0)) {
                                throw new RuntimeException("External process requested data even though none is available.");
                            }
                            sendWriteNotification(this.sender.sendBuffer(it, 0), this.sender.hasRemaining(0) || it.hasNext());
                            break;
                            break;
                        default:
                            this.receiver.collectBuffer(collector, i);
                            sendReadConfirmation();
                            break;
                    }
                }
            }
        } catch (SocketTimeoutException e2) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + ((Object) this.msg));
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x002c. Please report as an issue. */
    public final void streamBufferWithGroups(Iterator it, Iterator it2, Collector collector) throws IOException {
        try {
            if (!it.hasNext() && !it2.hasNext()) {
                return;
            }
            while (true) {
                this.in.read(this.buffer, 0, 4);
                int i = getInt(this.buffer, 0);
                switch (i) {
                    case SIGNAL_BUFFER_REQUEST_G1 /* -4 */:
                        if (it2.hasNext() || this.sender.hasRemaining(1)) {
                            sendWriteNotification(this.sender.sendBuffer(it2, 1), this.sender.hasRemaining(1) || it2.hasNext());
                        }
                        break;
                    case SIGNAL_BUFFER_REQUEST_G0 /* -3 */:
                        if (it.hasNext() || this.sender.hasRemaining(0)) {
                            sendWriteNotification(this.sender.sendBuffer(it, 0), this.sender.hasRemaining(0) || it.hasNext());
                        }
                        break;
                    case SIGNAL_ERROR /* -2 */:
                        try {
                            Thread.sleep(2000L);
                        } catch (InterruptedException e) {
                        }
                        throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + ((Object) this.msg));
                    case -1:
                        return;
                    default:
                        this.receiver.collectBuffer(collector, i);
                        sendReadConfirmation();
                }
            }
        } catch (SocketTimeoutException e2) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + ((Object) this.msg));
        }
    }

    protected static final int getInt(byte[] bArr, int i) {
        return (bArr[i] << 24) | ((bArr[i + 1] & 255) << 16) | ((bArr[i + 2] & 255) << 8) | (bArr[i + 3] & 255);
    }

    protected static final void putInt(byte[] bArr, int i, int i2) {
        bArr[i] = (byte) (i2 >> 24);
        bArr[i + 1] = (byte) (i2 >> 16);
        bArr[i + 2] = (byte) (i2 >> 8);
        bArr[i + 3] = (byte) i2;
    }
}
