package co.cask.cdap.data.stream;

import co.cask.cdap.common.async.ExecutorUtils;
import co.cask.cdap.common.conf.PropertyChangeListener;
import co.cask.cdap.common.conf.PropertyStore;
import co.cask.cdap.common.conf.PropertyUpdater;
import co.cask.cdap.common.io.Codec;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.data2.transaction.stream.AbstractStreamFileAdmin;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinator.class */
public abstract class AbstractStreamCoordinator implements StreamCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamCoordinator.class);
    private static final Gson GSON = new Gson();
    private final StreamAdmin streamAdmin;
    private final Supplier<PropertyStore<StreamProperty>> propertyStore = Suppliers.memoize(new Supplier<PropertyStore<StreamProperty>>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public PropertyStore<StreamProperty> m11get() {
            return AbstractStreamCoordinator.this.createPropertyStore(new StreamPropertyCodec());
        }
    });
    private final Executor updateExecutor = ExecutorUtils.newThreadExecutor(Threads.createDaemonThreadFactory("stream-coordinator-update-%d"));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinator$StreamProperty.class */
    public static final class StreamProperty {
        private final int generation;
        private final long ttl;

        private StreamProperty(int i, long j) {
            this.generation = i;
            this.ttl = j;
        }

        public int getGeneration() {
            return this.generation;
        }

        public long getTTL() {
            return this.ttl;
        }

        public String toString() {
            return Objects.toStringHelper(this).add("generation", this.generation).add("ttl", this.ttl).toString();
        }
    }

    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinator$StreamPropertyChangeListener.class */
    private static final class StreamPropertyChangeListener extends StreamPropertyListener implements PropertyChangeListener<StreamProperty> {
        private final StreamPropertyListener listener;
        private StreamProperty currentProperty;

        private StreamPropertyChangeListener(StreamAdmin streamAdmin, String str, StreamPropertyListener streamPropertyListener) {
            this.listener = streamPropertyListener;
            try {
                StreamConfig config = streamAdmin.getConfig(str);
                this.currentProperty = new StreamProperty(StreamUtils.getGeneration(config), config.getTTL());
            } catch (Exception e) {
                this.currentProperty = new StreamProperty(0, KeyValue.LATEST_TIMESTAMP);
            }
        }

        public void onChange(String str, StreamProperty streamProperty) {
            try {
                if (streamProperty != null) {
                    if (this.currentProperty == null || this.currentProperty.getGeneration() < streamProperty.getGeneration()) {
                        generationChanged(str, streamProperty.getGeneration());
                    }
                    if (this.currentProperty == null || this.currentProperty.getTTL() != streamProperty.getTTL()) {
                        ttlChanged(str, streamProperty.getTTL());
                    }
                } else {
                    generationDeleted(str);
                    ttlDeleted(str);
                }
            } finally {
                this.currentProperty = streamProperty;
            }
        }

        public void onError(String str, Throwable th) {
            AbstractStreamCoordinator.LOG.error("Exception on PropertyChangeListener for stream {}", str, th);
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationChanged(String str, int i) {
            try {
                this.listener.generationChanged(str, i);
            } catch (Throwable th) {
                AbstractStreamCoordinator.LOG.error("Exception while calling StreamPropertyListener.generationChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationDeleted(String str) {
            try {
                this.listener.generationDeleted(str);
            } catch (Throwable th) {
                AbstractStreamCoordinator.LOG.error("Exception while calling StreamPropertyListener.generationDeleted", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void ttlChanged(String str, long j) {
            try {
                this.listener.ttlChanged(str, j);
            } catch (Throwable th) {
                AbstractStreamCoordinator.LOG.error("Exception while calling StreamPropertyListener.ttlChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void ttlDeleted(String str) {
            try {
                this.listener.ttlDeleted(str);
            } catch (Throwable th) {
                AbstractStreamCoordinator.LOG.error("Exception while calling StreamPropertyListener.ttlDeleted", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinator$StreamPropertyCodec.class */
    public static final class StreamPropertyCodec implements Codec<StreamProperty> {
        private static final Gson GSON = new Gson();

        private StreamPropertyCodec() {
        }

        public byte[] encode(StreamProperty streamProperty) throws IOException {
            return GSON.toJson(streamProperty).getBytes(Charsets.UTF_8);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public StreamProperty m13decode(byte[] bArr) throws IOException {
            return (StreamProperty) GSON.fromJson(new String(bArr, Charsets.UTF_8), StreamProperty.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamCoordinator(StreamAdmin streamAdmin) {
        this.streamAdmin = streamAdmin;
    }

    protected abstract <T> PropertyStore<T> createPropertyStore(Codec<T> codec);

    @Override // co.cask.cdap.data.stream.StreamCoordinator
    public ListenableFuture<Integer> nextGeneration(final StreamConfig streamConfig, final int i) {
        return Futures.transform(((PropertyStore) this.propertyStore.get()).update(streamConfig.getName(), new PropertyUpdater<StreamProperty>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.2
            public ListenableFuture<StreamProperty> apply(@Nullable final StreamProperty streamProperty) {
                final SettableFuture create = SettableFuture.create();
                AbstractStreamCoordinator.this.updateExecutor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            long ttl = streamProperty == null ? streamConfig.getTTL() : streamProperty.getTTL();
                            int generation = (streamProperty == null ? i : streamProperty.getGeneration()) + 1;
                            Locations.mkdirsIfNotExists(StreamUtils.createGenerationLocation(streamConfig.getLocation(), generation));
                            create.set(new StreamProperty(generation, ttl));
                        } catch (IOException e) {
                            create.setException(e);
                        }
                    }
                });
                return create;
            }
        }), new Function<StreamProperty, Integer>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.3
            public Integer apply(StreamProperty streamProperty) {
                return Integer.valueOf(streamProperty.getGeneration());
            }
        });
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinator
    public ListenableFuture<Long> changeTTL(final StreamConfig streamConfig, final long j) {
        return Futures.transform(((PropertyStore) this.propertyStore.get()).update(streamConfig.getName(), new PropertyUpdater<StreamProperty>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.4
            public ListenableFuture<StreamProperty> apply(@Nullable final StreamProperty streamProperty) {
                final SettableFuture create = SettableFuture.create();
                AbstractStreamCoordinator.this.updateExecutor.execute(new Runnable() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            int generation = streamProperty == null ? StreamUtils.getGeneration(streamConfig) : streamProperty.getGeneration();
                            AbstractStreamCoordinator.this.saveConfig(new StreamConfig(streamConfig.getName(), streamConfig.getPartitionDuration(), streamConfig.getIndexInterval(), j, streamConfig.getLocation()));
                            create.set(new StreamProperty(generation, j));
                        } catch (IOException e) {
                            create.setException(e);
                        }
                    }
                });
                return create;
            }
        }), new Function<StreamProperty, Long>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinator.5
            public Long apply(StreamProperty streamProperty) {
                return Long.valueOf(streamProperty.getTTL());
            }
        });
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinator
    public Cancellable addListener(String str, StreamPropertyListener streamPropertyListener) {
        return ((PropertyStore) this.propertyStore.get()).addChangeListener(str, new StreamPropertyChangeListener(this.streamAdmin, str, streamPropertyListener));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        ((PropertyStore) this.propertyStore.get()).close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void saveConfig(StreamConfig streamConfig) throws IOException {
        Location append = streamConfig.getLocation().append(AbstractStreamFileAdmin.CONFIG_FILE_NAME);
        Location tempFile = append.getTempFile("tmp");
        try {
            CharStreams.write(GSON.toJson(streamConfig), CharStreams.newWriterSupplier(Locations.newOutputSupplier(tempFile), Charsets.UTF_8));
            Preconditions.checkState(tempFile.renameTo(append) != null, "Rename {} to {} failed", new Object[]{tempFile, append});
            if (tempFile.exists()) {
                tempFile.delete();
            }
        } catch (Throwable th) {
            if (tempFile.exists()) {
                tempFile.delete();
            }
            throw th;
        }
    }
}
