package co.cask.cdap.data.runtime;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.stream.StreamEventCodec;
import co.cask.cdap.data.file.FileWriter;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionFailureException;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:co/cask/cdap/data/runtime/InMemoryStreamFileWriterFactory.class */
final class InMemoryStreamFileWriterFactory implements StreamFileWriterFactory {
    private static final StreamEventCodec STREAM_EVENT_CODEC = new StreamEventCodec();
    private final QueueClientFactory queueClientFactory;
    private final TransactionExecutorFactory executorFactory;

    @Inject
    InMemoryStreamFileWriterFactory(QueueClientFactory queueClientFactory, TransactionExecutorFactory transactionExecutorFactory) {
        this.queueClientFactory = queueClientFactory;
        this.executorFactory = transactionExecutorFactory;
    }

    @Override // co.cask.cdap.data.stream.StreamFileWriterFactory
    public FileWriter<StreamEvent> create(StreamConfig streamConfig, int i) throws IOException {
        final TransactionAware createProducer = this.queueClientFactory.createProducer(QueueName.fromStream(streamConfig.getName()));
        ArrayList newArrayList = Lists.newArrayList();
        if (createProducer instanceof TransactionAware) {
            newArrayList.add(createProducer);
        }
        final TransactionExecutor createExecutor = this.executorFactory.createExecutor(newArrayList);
        return new FileWriter<StreamEvent>() { // from class: co.cask.cdap.data.runtime.InMemoryStreamFileWriterFactory.1
            private final List<StreamEvent> events = Lists.newArrayList();

            @Override // co.cask.cdap.data.file.FileWriter
            public void append(StreamEvent streamEvent) throws IOException {
                this.events.add(streamEvent);
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                if (createProducer instanceof Closeable) {
                    ((Closeable) createProducer).close();
                }
            }

            @Override // java.io.Flushable
            public void flush() throws IOException {
                try {
                    createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data.runtime.InMemoryStreamFileWriterFactory.1.1
                        public void apply() throws Exception {
                            Iterator it = AnonymousClass1.this.events.iterator();
                            while (it.hasNext()) {
                                createProducer.enqueue(new QueueEntry(InMemoryStreamFileWriterFactory.STREAM_EVENT_CODEC.encodePayload((StreamEvent) it.next())));
                            }
                            AnonymousClass1.this.events.clear();
                        }
                    });
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new InterruptedIOException();
                } catch (TransactionFailureException e2) {
                    throw new IOException((Throwable) e2);
                }
            }
        };
    }
}
