package org.apache.flink.python.api.streaming.data;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.python.api.PythonPlanBinder;

/* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender.class */
public class PythonSender<IN> implements Serializable {
    public static final byte TYPE_ARRAY = 63;
    public static final byte TYPE_KEY_VALUE = 62;
    public static final byte TYPE_VALUE_VALUE = 61;
    private File outputFile;
    private RandomAccessFile outputRAF;
    private FileChannel outputChannel;
    private MappedByteBuffer fileBuffer;
    private final ByteBuffer[] saved = new ByteBuffer[2];
    private final Serializer[] serializer = new Serializer[2];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$ArraySerializer.class */
    public class ArraySerializer extends PythonSender<IN>.Serializer<byte[]> {
        private ArraySerializer() {
            super();
        }

        @Override // org.apache.flink.python.api.streaming.data.PythonSender.Serializer
        public void serializeInternal(byte[] bArr) {
            this.buffer = ByteBuffer.allocate(bArr.length + 1);
            this.buffer.put((byte) 63);
            this.buffer.put(bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$KeyValuePairSerializer.class */
    public class KeyValuePairSerializer extends PythonSender<IN>.Serializer<Tuple2<Tuple, byte[]>> {
        private KeyValuePairSerializer() {
            super();
        }

        @Override // org.apache.flink.python.api.streaming.data.PythonSender.Serializer
        public void serializeInternal(Tuple2<Tuple, byte[]> tuple2) {
            int i = 0;
            for (int i2 = 0; i2 < ((Tuple) tuple2.f0).getArity(); i2++) {
                i += ((byte[]) ((Tuple) tuple2.f0).getField(i2)).length;
            }
            this.buffer = ByteBuffer.allocate(5 + i + ((byte[]) tuple2.f1).length);
            this.buffer.put((byte) 62);
            this.buffer.put((byte) ((Tuple) tuple2.f0).getArity());
            for (int i3 = 0; i3 < ((Tuple) tuple2.f0).getArity(); i3++) {
                this.buffer.put((byte[]) ((Tuple) tuple2.f0).getField(i3));
            }
            this.buffer.put((byte[]) tuple2.f1);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$Serializer.class */
    public abstract class Serializer<T> {
        protected ByteBuffer buffer;

        private Serializer() {
        }

        public ByteBuffer serialize(T t) {
            serializeInternal(t);
            this.buffer.flip();
            return this.buffer;
        }

        public abstract void serializeInternal(T t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$ValuePairSerializer.class */
    public class ValuePairSerializer extends PythonSender<IN>.Serializer<Tuple2<byte[], byte[]>> {
        private ValuePairSerializer() {
            super();
        }

        @Override // org.apache.flink.python.api.streaming.data.PythonSender.Serializer
        public void serializeInternal(Tuple2<byte[], byte[]> tuple2) {
            this.buffer = ByteBuffer.allocate(1 + ((byte[]) tuple2.f0).length + ((byte[]) tuple2.f1).length);
            this.buffer.put((byte) 61);
            this.buffer.put((byte[]) tuple2.f0);
            this.buffer.put((byte[]) tuple2.f1);
        }
    }

    public void open(String str) throws IOException {
        setupMappedFile(str);
    }

    private void setupMappedFile(String str) throws FileNotFoundException, IOException {
        new File(PythonPlanBinder.FLINK_TMP_DATA_DIR).mkdirs();
        this.outputFile = new File(str);
        if (this.outputFile.exists()) {
            this.outputFile.delete();
        }
        this.outputFile.createNewFile();
        this.outputRAF = new RandomAccessFile(str, "rw");
        this.outputRAF.setLength(67108864L);
        this.outputRAF.seek(67108863L);
        this.outputRAF.writeByte(0);
        this.outputRAF.seek(0L);
        this.outputChannel = this.outputRAF.getChannel();
        this.fileBuffer = this.outputChannel.map(FileChannel.MapMode.READ_WRITE, 0L, 67108864L);
    }

    public void close() throws IOException {
        closeMappedFile();
    }

    private void closeMappedFile() throws IOException {
        this.outputChannel.close();
        this.outputRAF.close();
    }

    public void reset() {
        this.serializer[0] = null;
        this.serializer[1] = null;
        this.fileBuffer.clear();
    }

    public int sendRecord(Object obj) throws IOException {
        this.fileBuffer.clear();
        this.serializer[0] = getSerializer(obj);
        ByteBuffer serialize = this.serializer[0].serialize(obj);
        if (serialize.remaining() > 67108864) {
            throw new RuntimeException("Serialized object does not fit into a single buffer.");
        }
        this.fileBuffer.put(serialize);
        int position = this.fileBuffer.position();
        reset();
        return position;
    }

    public boolean hasRemaining(int i) {
        return this.saved[i] != null;
    }

    public int sendBuffer(Iterator it, int i) throws IOException {
        this.fileBuffer.clear();
        if (this.serializer[i] == null) {
            Object next = it.next();
            this.serializer[i] = getSerializer(next);
            ByteBuffer serialize = this.serializer[i].serialize(next);
            if (serialize.remaining() > 67108864) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            this.fileBuffer.put(serialize);
        }
        if (this.saved[i] != null) {
            this.fileBuffer.put(this.saved[i]);
            this.saved[i] = null;
        }
        while (it.hasNext() && this.saved[i] == null) {
            ByteBuffer serialize2 = this.serializer[i].serialize(it.next());
            if (serialize2.remaining() > 67108864) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            if (serialize2.remaining() <= this.fileBuffer.remaining()) {
                this.fileBuffer.put(serialize2);
            } else {
                this.saved[i] = serialize2;
            }
        }
        return this.fileBuffer.position();
    }

    private Serializer getSerializer(Object obj) {
        if (obj instanceof byte[]) {
            return new ArraySerializer();
        }
        if (((Tuple2) obj).f0 instanceof byte[]) {
            return new ValuePairSerializer();
        }
        if (((Tuple2) obj).f0 instanceof Tuple) {
            return new KeyValuePairSerializer();
        }
        throw new IllegalArgumentException("This object can't be serialized: " + obj.toString());
    }
}
