package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.data.file.FileWriter;
import co.cask.cdap.data.file.FileWriters;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamFileType;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data.stream.StreamPropertyListener;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data.stream.TimestampCloseable;
import co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.proto.Id;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.twill.common.Cancellable;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter.class */
public final class ConcurrentStreamWriter implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentStreamWriter.class);
    private final StreamCoordinatorClient streamCoordinatorClient;
    private final StreamAdmin streamAdmin;
    private final int workerThreads;
    private final StreamMetricsCollectorFactory metricsCollectorFactory;
    private final ConcurrentMap<Id.Stream, EventQueue> eventQueues;
    private final StreamFileFactory streamFileFactory;
    private final Set<Id.Stream> generationWatched = Sets.newHashSet();
    private final List<Cancellable> cancellables = Lists.newArrayList();
    private final Lock createLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$BatchWriteRequest.class */
    public static final class BatchWriteRequest extends WriteRequest implements Iterator<StreamEventData> {
        private final Iterator<? extends StreamEventData> events;
        private WriteRequest.Metrics metrics;

        private BatchWriteRequest(Iterator<? extends StreamEventData> it) {
            super();
            this.events = it;
        }

        @Override // co.cask.cdap.data.stream.service.ConcurrentStreamWriter.WriteRequest
        void write(FileWriter<StreamEventData> fileWriter, WriteRequest.Metrics metrics) throws IOException {
            this.metrics = metrics;
            fileWriter.appendAll(this);
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.events.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public StreamEventData next() {
            StreamEventData next = this.events.next();
            this.metrics.increment(((ByteBuffer) next.getBody()).remaining());
            return next;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Remove not supported");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$EventQueue.class */
    public final class EventQueue implements Closeable {
        private final Id.Stream streamId;
        private final StreamMetricsCollectorFactory.StreamMetricsCollector metricsCollector;
        private final Function<StreamEventData, StreamEvent> eventTransformer;
        private FileWriter<StreamEventData> fileWriter;
        private boolean closed;
        private final MutableStreamEvent streamEvent = new MutableStreamEvent();
        private final Queue<WriteRequest> queue = new ConcurrentLinkedQueue();
        private final AtomicBoolean writerFlag = new AtomicBoolean(false);
        private final WriteRequest.Metrics metrics = new WriteRequest.Metrics();

        EventQueue(Id.Stream stream, StreamMetricsCollectorFactory.StreamMetricsCollector streamMetricsCollector) {
            this.streamId = stream;
            this.metricsCollector = streamMetricsCollector;
            this.eventTransformer = new Function<StreamEventData, StreamEvent>() { // from class: co.cask.cdap.data.stream.service.ConcurrentStreamWriter.EventQueue.1
                public StreamEvent apply(StreamEventData streamEventData) {
                    return EventQueue.this.streamEvent.setData(streamEventData);
                }
            };
        }

        WriteRequest append(Map<String, String> map, ByteBuffer byteBuffer) {
            SingleWriteRequest singleWriteRequest = new SingleWriteRequest(map, byteBuffer);
            this.queue.add(singleWriteRequest);
            return singleWriteRequest;
        }

        WriteRequest append(Iterator<? extends StreamEventData> it) {
            BatchWriteRequest batchWriteRequest = new BatchWriteRequest(it);
            this.queue.add(batchWriteRequest);
            return batchWriteRequest;
        }

        boolean tryAppendFile(StreamConfig streamConfig, Location location, Location location2, long j, TimestampCloseable timestampCloseable) throws IOException {
            if (!this.writerFlag.compareAndSet(false, true)) {
                return false;
            }
            try {
                if (this.closed) {
                    throw new IOException("Stream writer already closed");
                }
                if (this.fileWriter != null) {
                    this.fileWriter.close();
                    this.fileWriter = null;
                }
                timestampCloseable.close();
                long length = location.length();
                ConcurrentStreamWriter.this.streamFileFactory.appendFile(streamConfig, location, location2, timestampCloseable.getCloseTimestamp());
                this.writerFlag.set(false);
                this.metricsCollector.emitMetrics(length, j);
                return true;
            } catch (Throwable th) {
                this.writerFlag.set(false);
                throw th;
            }
        }

        boolean tryWrite() {
            int i = 0;
            int i2 = 0;
            if (!this.writerFlag.compareAndSet(false, true)) {
                return false;
            }
            try {
                this.metrics.reset();
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(ConcurrentStreamWriter.this.workerThreads);
                try {
                    FileWriter<StreamEventData> fileWriter = getFileWriter();
                    WriteRequest poll = this.queue.poll();
                    this.streamEvent.setTimestamp(System.currentTimeMillis());
                    while (poll != null) {
                        newArrayListWithExpectedSize.add(poll);
                        poll.write(fileWriter, this.metrics);
                        poll = this.queue.poll();
                    }
                    fileWriter.flush();
                    Iterator it = newArrayListWithExpectedSize.iterator();
                    while (it.hasNext()) {
                        ((WriteRequest) it.next()).completed(null);
                    }
                    i = this.metrics.bytesWritten;
                    i2 = this.metrics.eventsWritten;
                } catch (Throwable th) {
                    ConcurrentStreamWriter.this.eventQueues.remove(this.streamId, this);
                    doClose();
                    Iterator it2 = newArrayListWithExpectedSize.iterator();
                    while (it2.hasNext()) {
                        ((WriteRequest) it2.next()).completed(th);
                    }
                }
                this.metricsCollector.emitMetrics(i, i2);
                return true;
            } finally {
                this.writerFlag.set(false);
            }
        }

        private FileWriter<StreamEventData> getFileWriter() throws IOException {
            if (this.closed) {
                throw new IOException("Stream writer already closed");
            }
            if (this.fileWriter == null) {
                this.fileWriter = FileWriters.transform(ConcurrentStreamWriter.this.streamFileFactory.create(this.streamId), this.eventTransformer);
            }
            return this.fileWriter;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            boolean z = false;
            while (!z) {
                if (this.writerFlag.compareAndSet(false, true)) {
                    try {
                        doClose();
                        z = true;
                        this.writerFlag.set(false);
                    } catch (Throwable th) {
                        this.writerFlag.set(false);
                        throw th;
                    }
                } else {
                    Thread.yield();
                }
            }
        }

        private void doClose() {
            if (this.fileWriter != null) {
                Closeables.closeQuietly(this.fileWriter);
            }
            WriteRequest poll = this.queue.poll();
            Throwable fillInStackTrace = new IOException("Stream writer closed").fillInStackTrace();
            while (poll != null) {
                poll.completed(fillInStackTrace);
                poll = this.queue.poll();
            }
            this.closed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$SingleWriteRequest.class */
    public static final class SingleWriteRequest extends WriteRequest {
        private final StreamEventData eventData;

        SingleWriteRequest(Map<String, String> map, ByteBuffer byteBuffer) {
            super();
            this.eventData = new StreamEventData(map, byteBuffer);
        }

        @Override // co.cask.cdap.data.stream.service.ConcurrentStreamWriter.WriteRequest
        void write(FileWriter<StreamEventData> fileWriter, WriteRequest.Metrics metrics) throws IOException {
            metrics.increment(((ByteBuffer) this.eventData.getBody()).remaining());
            fileWriter.append(this.eventData);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$StreamFileFactory.class */
    public final class StreamFileFactory extends StreamPropertyListener {
        private final StreamFileWriterFactory writerFactory;

        StreamFileFactory(StreamFileWriterFactory streamFileWriterFactory) {
            this.writerFactory = streamFileWriterFactory;
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationChanged(Id.Stream stream, int i) {
            ConcurrentStreamWriter.LOG.debug("Generation for stream '{}' changed to {} for stream writer", stream, Integer.valueOf(i));
            closeEventQueue(stream);
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void deleted(Id.Stream stream) {
            ConcurrentStreamWriter.LOG.debug("Properties deleted for stream '{}' for stream writer", stream);
            closeEventQueue(stream);
        }

        private void closeEventQueue(Id.Stream stream) {
            EventQueue eventQueue = (EventQueue) ConcurrentStreamWriter.this.eventQueues.remove(stream);
            if (eventQueue != null) {
                try {
                    eventQueue.close();
                } catch (IOException e) {
                    ConcurrentStreamWriter.LOG.warn("Failed to close writer.", e);
                }
            }
        }

        FileWriter<StreamEvent> create(Id.Stream stream) throws IOException {
            StreamConfig config = ConcurrentStreamWriter.this.streamAdmin.getConfig(stream);
            int generation = StreamUtils.getGeneration(config);
            ConcurrentStreamWriter.LOG.info("Create stream writer for {} with generation {}", stream, Integer.valueOf(generation));
            return this.writerFactory.create(config, generation);
        }

        void appendFile(StreamConfig streamConfig, Location location, Location location2, long j) throws IOException {
            Location createGenerationLocation = StreamUtils.createGenerationLocation(streamConfig.getLocation(), StreamUtils.getGeneration(streamConfig));
            long partitionDuration = streamConfig.getPartitionDuration();
            Location createPartitionLocation = StreamUtils.createPartitionLocation(createGenerationLocation, StreamUtils.getPartitionStartTime(j, partitionDuration), partitionDuration);
            createPartitionLocation.mkdirs();
            String fileNamePrefix = this.writerFactory.getFileNamePrefix();
            int nextSequenceId = StreamUtils.getNextSequenceId(createPartitionLocation, fileNamePrefix);
            Location createStreamLocation = StreamUtils.createStreamLocation(createPartitionLocation, fileNamePrefix, nextSequenceId, StreamFileType.EVENT);
            Location createStreamLocation2 = StreamUtils.createStreamLocation(createPartitionLocation, fileNamePrefix, nextSequenceId, StreamFileType.INDEX);
            if (!createStreamLocation.createNew() || !createStreamLocation2.createNew()) {
                throw new IOException(String.format("Failed to create new file at %s and %s", createStreamLocation.toURI(), createStreamLocation2.toURI()));
            }
            location2.renameTo(createStreamLocation2);
            location.renameTo(createStreamLocation);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$WriteRequest.class */
    public static abstract class WriteRequest {
        private State state;
        private Throwable failure;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$WriteRequest$Metrics.class */
        public static final class Metrics {
            int bytesWritten;
            int eventsWritten;

            Metrics() {
            }

            void reset() {
                this.eventsWritten = 0;
                this.bytesWritten = 0;
            }

            void increment(int i) {
                this.bytesWritten += i;
                this.eventsWritten++;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$WriteRequest$State.class */
        public enum State {
            PENDING,
            COMPLETED
        }

        private WriteRequest() {
            this.state = State.PENDING;
        }

        boolean isCompleted() {
            return this.state != State.PENDING;
        }

        boolean isSuccess() {
            return isCompleted() && this.failure == null;
        }

        void completed(Throwable th) {
            this.state = State.COMPLETED;
            this.failure = th;
        }

        Throwable getFailure() {
            return this.failure;
        }

        abstract void write(FileWriter<StreamEventData> fileWriter, Metrics metrics) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentStreamWriter(StreamCoordinatorClient streamCoordinatorClient, StreamAdmin streamAdmin, StreamFileWriterFactory streamFileWriterFactory, int i, StreamMetricsCollectorFactory streamMetricsCollectorFactory) {
        this.streamCoordinatorClient = streamCoordinatorClient;
        this.streamAdmin = streamAdmin;
        this.workerThreads = i;
        this.metricsCollectorFactory = streamMetricsCollectorFactory;
        this.eventQueues = new MapMaker().concurrencyLevel(i).makeMap();
        this.streamFileFactory = new StreamFileFactory(streamFileWriterFactory);
    }

    public void enqueue(Id.Stream stream, Map<String, String> map, ByteBuffer byteBuffer) throws IOException {
        EventQueue eventQueue = getEventQueue(stream);
        persistUntilCompleted(stream, eventQueue, eventQueue.append(map, byteBuffer));
    }

    public void enqueue(Id.Stream stream, Iterator<? extends StreamEventData> it) throws IOException {
        EventQueue eventQueue = getEventQueue(stream);
        persistUntilCompleted(stream, eventQueue, eventQueue.append(it));
    }

    public void asyncEnqueue(final Id.Stream stream, Map<String, String> map, ByteBuffer byteBuffer, Executor executor) throws IOException {
        final EventQueue eventQueue = getEventQueue(stream);
        final WriteRequest append = eventQueue.append(map, byteBuffer);
        executor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.service.ConcurrentStreamWriter.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ConcurrentStreamWriter.this.persistUntilCompleted(stream, eventQueue, append);
                } catch (IOException e) {
                    ConcurrentStreamWriter.LOG.error("Async write failed", e);
                }
            }
        });
    }

    public void appendFile(Id.Stream stream, Location location, Location location2, long j, TimestampCloseable timestampCloseable) throws IOException {
        EventQueue eventQueue = getEventQueue(stream);
        StreamConfig config = this.streamAdmin.getConfig(stream);
        while (!eventQueue.tryAppendFile(config, location, location2, j, timestampCloseable)) {
            Thread.yield();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<Cancellable> it = this.cancellables.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        Iterator<EventQueue> it2 = this.eventQueues.values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().close();
            } catch (IOException e) {
                LOG.warn("Failed to close writer.", e);
            }
        }
    }

    private EventQueue getEventQueue(Id.Stream stream) throws IOException {
        EventQueue eventQueue = this.eventQueues.get(stream);
        if (eventQueue != null) {
            return eventQueue;
        }
        this.createLock.lock();
        try {
            try {
                EventQueue eventQueue2 = this.eventQueues.get(stream);
                if (eventQueue2 != null) {
                    return eventQueue2;
                }
                if (!this.streamAdmin.exists(stream)) {
                    throw new IllegalArgumentException("Stream not exists");
                }
                StreamUtils.ensureExists(this.streamAdmin, stream);
                if (this.generationWatched.add(stream)) {
                    this.cancellables.add(this.streamCoordinatorClient.addListener(stream, this.streamFileFactory));
                }
                EventQueue eventQueue3 = new EventQueue(stream, this.metricsCollectorFactory.createMetricsCollector(stream));
                this.eventQueues.put(stream, eventQueue3);
                this.createLock.unlock();
                return eventQueue3;
            } catch (Exception e) {
                Throwables.propagateIfInstanceOf(e, IllegalArgumentException.class);
                Throwables.propagateIfInstanceOf(e, IOException.class);
                throw new IOException(e);
            }
        } finally {
            this.createLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void persistUntilCompleted(Id.Stream stream, EventQueue eventQueue, WriteRequest writeRequest) throws IOException {
        while (!writeRequest.isCompleted()) {
            if (!eventQueue.tryWrite()) {
                Thread.yield();
            }
        }
        if (writeRequest.isSuccess()) {
            return;
        }
        Throwables.propagateIfInstanceOf(writeRequest.getFailure(), IOException.class);
        throw new IOException("Unable to write stream event to " + stream, writeRequest.getFailure());
    }
}
