package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.common.metrics.MetricsCollector;
import co.cask.cdap.data.file.FileWriter;
import co.cask.cdap.data.stream.StreamCoordinator;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data.stream.StreamPropertyListener;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.twill.common.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter.class */
public final class ConcurrentStreamWriter implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentStreamWriter.class);
    private final StreamCoordinator streamCoordinator;
    private final StreamAdmin streamAdmin;
    private final StreamMetaStore streamMetaStore;
    private final int workerThreads;
    private final MetricsCollector metricsCollector;
    private final ConcurrentMap<String, EventQueue> eventQueues;
    private final FileWriterSupplierFactory writerSupplierFactory;
    private final Set<String> generationWatched = Sets.newHashSet();
    private final List<Cancellable> cancellables = Lists.newArrayList();
    private final Lock createLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$EventQueue.class */
    public final class EventQueue implements Closeable {
        private final String streamName;
        private final Supplier<FileWriter<StreamEvent>> writerSupplier;
        private final Queue<HandlerStreamEventData> queue = new ConcurrentLinkedQueue();
        private final AtomicBoolean writerFlag = new AtomicBoolean(false);
        private final SettableStreamEvent streamEvent = new SettableStreamEvent();

        EventQueue(String str, Supplier<FileWriter<StreamEvent>> supplier) {
            this.streamName = str;
            this.writerSupplier = Suppliers.memoize(supplier);
        }

        HandlerStreamEventData add(Map<String, String> map, ByteBuffer byteBuffer) {
            HandlerStreamEventData handlerStreamEventData = new HandlerStreamEventData(map, byteBuffer);
            this.queue.add(handlerStreamEventData);
            return handlerStreamEventData;
        }

        boolean tryWrite() {
            if (!this.writerFlag.compareAndSet(false, true)) {
                return false;
            }
            int i = 0;
            int i2 = 0;
            ArrayList<HandlerStreamEventData> newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(ConcurrentStreamWriter.this.workerThreads);
            try {
                try {
                    FileWriter fileWriter = (FileWriter) this.writerSupplier.get();
                    HandlerStreamEventData poll = this.queue.poll();
                    long currentTimeMillis = System.currentTimeMillis();
                    while (poll != null) {
                        newArrayListWithExpectedSize.add(poll);
                        fileWriter.append(this.streamEvent.set(poll, currentTimeMillis));
                        poll = this.queue.poll();
                    }
                    fileWriter.flush();
                    for (HandlerStreamEventData handlerStreamEventData : newArrayListWithExpectedSize) {
                        handlerStreamEventData.completed(null);
                        i += handlerStreamEventData.getBody().remaining();
                    }
                    i2 = newArrayListWithExpectedSize.size();
                    this.writerFlag.set(false);
                } catch (Throwable th) {
                    ConcurrentStreamWriter.LOG.error("Failed to write to file for stream {}.", this.streamName, th);
                    ConcurrentStreamWriter.this.eventQueues.remove(this.streamName, this);
                    Closeables.closeQuietly((Closeable) this.writerSupplier.get());
                    Iterator it = newArrayListWithExpectedSize.iterator();
                    while (it.hasNext()) {
                        ((HandlerStreamEventData) it.next()).completed(th);
                    }
                    this.writerFlag.set(false);
                }
                if (i2 <= 0) {
                    return true;
                }
                ConcurrentStreamWriter.this.metricsCollector.increment("collect.events", i2, new String[]{this.streamName});
                ConcurrentStreamWriter.this.metricsCollector.increment("collect.bytes", i, new String[]{this.streamName});
                return true;
            } catch (Throwable th2) {
                this.writerFlag.set(false);
                throw th2;
            }
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            boolean z = false;
            while (!z) {
                if (this.writerFlag.compareAndSet(false, true)) {
                    try {
                        ((FileWriter) this.writerSupplier.get()).close();
                        HandlerStreamEventData poll = this.queue.poll();
                        Throwable fillInStackTrace = new IOException("Stream writer closed").fillInStackTrace();
                        while (poll != null) {
                            poll.completed(fillInStackTrace);
                            poll = this.queue.poll();
                        }
                        z = true;
                        this.writerFlag.set(false);
                    } catch (Throwable th) {
                        this.writerFlag.set(false);
                        throw th;
                    }
                } else {
                    Thread.yield();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$FileWriterSupplierFactory.class */
    public final class FileWriterSupplierFactory extends StreamPropertyListener {
        private final StreamFileWriterFactory writerFactory;
        private final Map<String, Integer> generations = Collections.synchronizedMap(Maps.newHashMap());

        FileWriterSupplierFactory(StreamFileWriterFactory streamFileWriterFactory) {
            this.writerFactory = streamFileWriterFactory;
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationChanged(String str, int i) {
            ConcurrentStreamWriter.LOG.debug("Generation for stream '{}' changed to {} for stream writer", str, Integer.valueOf(i));
            this.generations.put(str, Integer.valueOf(i));
            EventQueue eventQueue = (EventQueue) ConcurrentStreamWriter.this.eventQueues.remove(str);
            if (eventQueue != null) {
                try {
                    eventQueue.close();
                } catch (IOException e) {
                    ConcurrentStreamWriter.LOG.warn("Failed to close writer.", e);
                }
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationDeleted(String str) {
            ConcurrentStreamWriter.LOG.debug("Generation for stream '{}' deleted for stream writer", str);
            this.generations.remove(str);
        }

        Supplier<FileWriter<StreamEvent>> create(final String str) {
            return new Supplier<FileWriter<StreamEvent>>() { // from class: co.cask.cdap.data.stream.service.ConcurrentStreamWriter.FileWriterSupplierFactory.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public FileWriter<StreamEvent> m23get() {
                    try {
                        StreamConfig config = ConcurrentStreamWriter.this.streamAdmin.getConfig(str);
                        Integer num = (Integer) FileWriterSupplierFactory.this.generations.get(str);
                        if (num == null) {
                            num = Integer.valueOf(StreamUtils.getGeneration(config));
                        }
                        ConcurrentStreamWriter.LOG.info("Create stream writer for {} with generation {}", str, num);
                        return FileWriterSupplierFactory.this.writerFactory.create(config, num.intValue());
                    } catch (IOException e) {
                        throw Throwables.propagate(e);
                    }
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$HandlerStreamEventData.class */
    public static final class HandlerStreamEventData extends StreamEventData {
        private State state;
        private Throwable failure;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$HandlerStreamEventData$State.class */
        public enum State {
            PENDING,
            COMPLETED
        }

        public HandlerStreamEventData(Map<String, String> map, ByteBuffer byteBuffer) {
            super(map, byteBuffer);
            this.state = State.PENDING;
        }

        public boolean isCompleted() {
            return this.state != State.PENDING;
        }

        public boolean isSuccess() {
            return isCompleted() && this.failure == null;
        }

        public void completed(Throwable th) {
            this.state = State.COMPLETED;
            this.failure = th;
        }

        public Throwable getFailure() {
            return this.failure;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/ConcurrentStreamWriter$SettableStreamEvent.class */
    public static final class SettableStreamEvent extends StreamEvent {
        private StreamEventData data;
        private long timestamp;

        private SettableStreamEvent() {
        }

        public StreamEvent set(StreamEventData streamEventData, long j) {
            this.data = streamEventData;
            this.timestamp = j;
            return this;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public ByteBuffer getBody() {
            return this.data.getBody();
        }

        public Map<String, String> getHeaders() {
            return this.data.getHeaders();
        }
    }

    public ConcurrentStreamWriter(StreamCoordinator streamCoordinator, StreamAdmin streamAdmin, StreamMetaStore streamMetaStore, StreamFileWriterFactory streamFileWriterFactory, int i, MetricsCollector metricsCollector) {
        this.streamCoordinator = streamCoordinator;
        this.streamAdmin = streamAdmin;
        this.streamMetaStore = streamMetaStore;
        this.workerThreads = i;
        this.metricsCollector = metricsCollector;
        this.eventQueues = new MapMaker().concurrencyLevel(i).makeMap();
        this.writerSupplierFactory = new FileWriterSupplierFactory(streamFileWriterFactory);
    }

    public void enqueue(String str, String str2, Map<String, String> map, ByteBuffer byteBuffer) throws IOException {
        EventQueue eventQueue = getEventQueue(str, str2);
        HandlerStreamEventData add = eventQueue.add(map, byteBuffer);
        persistUntilCompleted(eventQueue, add);
        if (add.isSuccess()) {
            return;
        }
        Throwables.propagateIfInstanceOf(add.getFailure(), IOException.class);
        throw new IOException("Unable to write stream event to " + str2, add.getFailure());
    }

    public void asyncEnqueue(String str, String str2, Map<String, String> map, ByteBuffer byteBuffer, Executor executor) throws IOException {
        final EventQueue eventQueue = getEventQueue(str, str2);
        final HandlerStreamEventData add = eventQueue.add(map, byteBuffer);
        executor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.service.ConcurrentStreamWriter.1
            @Override // java.lang.Runnable
            public void run() {
                ConcurrentStreamWriter.this.persistUntilCompleted(eventQueue, add);
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<Cancellable> it = this.cancellables.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        Iterator<EventQueue> it2 = this.eventQueues.values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().close();
            } catch (IOException e) {
                LOG.warn("Failed to close writer.", e);
            }
        }
    }

    private EventQueue getEventQueue(String str, String str2) throws IOException {
        EventQueue eventQueue = this.eventQueues.get(str2);
        if (eventQueue != null) {
            return eventQueue;
        }
        this.createLock.lock();
        try {
            try {
                EventQueue eventQueue2 = this.eventQueues.get(str2);
                if (eventQueue2 != null) {
                    return eventQueue2;
                }
                if (!this.streamMetaStore.streamExists(str, str2)) {
                    throw new IllegalArgumentException("Stream not exists");
                }
                StreamUtils.ensureExists(this.streamAdmin, str2);
                if (this.generationWatched.add(str2)) {
                    this.cancellables.add(this.streamCoordinator.addListener(str2, this.writerSupplierFactory));
                }
                EventQueue eventQueue3 = new EventQueue(str2, this.writerSupplierFactory.create(str2));
                this.eventQueues.put(str2, eventQueue3);
                this.createLock.unlock();
                return eventQueue3;
            } catch (Exception e) {
                Throwables.propagateIfPossible(e, IOException.class);
                throw new IOException(e);
            }
        } finally {
            this.createLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void persistUntilCompleted(EventQueue eventQueue, HandlerStreamEventData handlerStreamEventData) {
        while (!handlerStreamEventData.isCompleted()) {
            if (!eventQueue.tryWrite()) {
                Thread.yield();
            }
        }
    }
}
