package org.apache.reef.io.storage;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.reef.exception.evaluator.ServiceException;
import org.apache.reef.exception.evaluator.StorageException;
import org.apache.reef.io.Accumulable;
import org.apache.reef.io.Accumulator;
import org.apache.reef.io.Tuple;
import org.apache.reef.io.serialization.Serializer;

/* loaded from: input_file:org/apache/reef/io/storage/FramingTupleSerializer.class */
public class FramingTupleSerializer<K, V> implements Serializer<Tuple<K, V>, OutputStream> {
    private final Serializer<K, OutputStream> keySerializer;
    private final Serializer<V, OutputStream> valSerializer;

    public FramingTupleSerializer(Serializer<K, OutputStream> serializer, Serializer<V, OutputStream> serializer2) {
        this.keySerializer = serializer;
        this.valSerializer = serializer2;
    }

    public Accumulable<Tuple<K, V>> create(OutputStream outputStream) {
        final FramingOutputStream framingOutputStream = new FramingOutputStream(outputStream);
        return new Accumulable<Tuple<K, V>>() { // from class: org.apache.reef.io.storage.FramingTupleSerializer.1
            public Accumulator<Tuple<K, V>> accumulator() throws ServiceException {
                final Accumulator accumulator = FramingTupleSerializer.this.keySerializer.create(framingOutputStream).accumulator();
                final Accumulator accumulator2 = FramingTupleSerializer.this.valSerializer.create(framingOutputStream).accumulator();
                return new Accumulator<Tuple<K, V>>() { // from class: org.apache.reef.io.storage.FramingTupleSerializer.1.1
                    boolean first = true;

                    public void add(Tuple<K, V> tuple) throws ServiceException {
                        if (!this.first) {
                            framingOutputStream.nextFrame();
                        }
                        this.first = false;
                        accumulator.add(tuple.getKey());
                        framingOutputStream.nextFrame();
                        accumulator2.add(tuple.getValue());
                    }

                    public void close() throws ServiceException {
                        try {
                            accumulator.close();
                            accumulator2.close();
                            framingOutputStream.close();
                        } catch (IOException e) {
                            throw new StorageException(e);
                        }
                    }
                };
            }
        };
    }
}
