package org.apache.nemo.runtime.executor.datatransfer;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.apache.nemo.common.coder.EncoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.class */
public final class NemoEventEncoderFactory implements EncoderFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NemoEventEncoderFactory.class.getName());
    private final EncoderFactory valueEncoderFactory;

    /* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory$NemoEventEncoder.class */
    private final class NemoEventEncoder<T> implements EncoderFactory.Encoder<T> {
        private final EncoderFactory.Encoder<T> valueEncoder;
        private final OutputStream outputStream;

        NemoEventEncoder(EncoderFactory.Encoder<T> encoder, OutputStream outputStream) {
            this.valueEncoder = encoder;
            this.outputStream = outputStream;
        }

        public void encode(T t) throws IOException {
            if (t instanceof WatermarkWithIndex) {
                this.outputStream.write(1);
                this.outputStream.write(SerializationUtils.serialize((Serializable) t));
            } else {
                this.outputStream.write(0);
                this.valueEncoder.encode(t);
            }
        }
    }

    public NemoEventEncoderFactory(EncoderFactory encoderFactory) {
        this.valueEncoderFactory = encoderFactory;
    }

    public EncoderFactory.Encoder create(OutputStream outputStream) throws IOException {
        return new NemoEventEncoder(this.valueEncoderFactory.create(outputStream), outputStream);
    }
}
