package co.cask.cdap.data.stream;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.common.conf.PropertyChangeListener;
import co.cask.cdap.common.conf.PropertyStore;
import co.cask.cdap.common.conf.SyncPropertyUpdater;
import co.cask.cdap.common.io.Codec;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.Id;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinatorClient.class */
public abstract class AbstractStreamCoordinatorClient extends AbstractIdleService implements StreamCoordinatorClient {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamCoordinatorClient.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private PropertyStore<CoordinatorStreamProperties> propertyStore;

    /* loaded from: input_file:co/cask/cdap/data/stream/AbstractStreamCoordinatorClient$StreamPropertyChangeListener.class */
    private final class StreamPropertyChangeListener extends StreamPropertyListener implements PropertyChangeListener<CoordinatorStreamProperties> {
        private final StreamPropertyListener listener;
        private CoordinatorStreamProperties oldProperties;

        private StreamPropertyChangeListener(StreamPropertyListener streamPropertyListener) {
            this.listener = streamPropertyListener;
        }

        public void onChange(String str, CoordinatorStreamProperties coordinatorStreamProperties) {
            Id.Stream stream = (Id.Stream) Id.Stream.fromString(str, Id.Stream.class);
            if (coordinatorStreamProperties == null) {
                deleted(stream);
                this.oldProperties = null;
                return;
            }
            Integer generation = coordinatorStreamProperties.getGeneration();
            Integer generation2 = this.oldProperties == null ? null : this.oldProperties.getGeneration();
            if (generation != null && (generation2 == null || generation.intValue() > generation2.intValue())) {
                generationChanged(stream, generation.intValue());
            }
            Long ttl = coordinatorStreamProperties.getTTL();
            Long ttl2 = this.oldProperties == null ? null : this.oldProperties.getTTL();
            if (ttl != null && !ttl.equals(ttl2)) {
                ttlChanged(stream, ttl.longValue());
            }
            Integer notificationThresholdMB = coordinatorStreamProperties.getNotificationThresholdMB();
            Integer notificationThresholdMB2 = this.oldProperties == null ? null : this.oldProperties.getNotificationThresholdMB();
            if (notificationThresholdMB != null && !notificationThresholdMB.equals(notificationThresholdMB2)) {
                thresholdChanged(stream, notificationThresholdMB.intValue());
            }
            this.oldProperties = coordinatorStreamProperties;
        }

        public void onError(String str, Throwable th) {
            AbstractStreamCoordinatorClient.LOG.error("Exception on PropertyChangeListener for stream {}", str, th);
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void generationChanged(Id.Stream stream, int i) {
            try {
                this.listener.generationChanged(stream, i);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.generationChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void ttlChanged(Id.Stream stream, long j) {
            try {
                this.listener.ttlChanged(stream, j);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.ttlChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void thresholdChanged(Id.Stream stream, int i) {
            try {
                this.listener.thresholdChanged(stream, i);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.thresholdChanged", th);
            }
        }

        @Override // co.cask.cdap.data.stream.StreamPropertyListener
        public void deleted(Id.Stream stream) {
            try {
                this.listener.deleted(stream);
            } catch (Throwable th) {
                AbstractStreamCoordinatorClient.LOG.error("Exception while calling StreamPropertyListener.deleted", th);
            }
        }
    }

    protected abstract void doStartUp() throws Exception;

    protected abstract void doShutDown() throws Exception;

    protected abstract <T> PropertyStore<T> createPropertyStore(Codec<T> codec);

    protected abstract Lock getLock(Id.Stream stream);

    protected abstract void streamCreated(Id.Stream stream);

    protected abstract void streamDeleted(Id.Stream stream);

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public StreamConfig createStream(Id.Stream stream, Callable<StreamConfig> callable) throws Exception {
        Lock lock = getLock(stream);
        lock.lock();
        try {
            StreamConfig call = callable.call();
            if (call != null) {
                streamCreated(stream);
            }
            return call;
        } finally {
            lock.unlock();
        }
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public void updateProperties(Id.Stream stream, Callable<CoordinatorStreamProperties> callable) throws Exception {
        Lock lock = getLock(stream);
        lock.lock();
        try {
            updateProperties(stream, callable.call()).get();
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public void deleteStream(Id.Stream stream, Runnable runnable) throws Exception {
        Lock lock = getLock(stream);
        lock.lock();
        try {
            runnable.run();
            this.propertyStore.set(stream.toString(), (Object) null).get();
            streamDeleted(stream);
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public <T> T exclusiveAction(Id.Stream stream, Callable<T> callable) throws Exception {
        Lock lock = getLock(stream);
        lock.lock();
        try {
            T call = callable.call();
            lock.unlock();
            return call;
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // co.cask.cdap.data.stream.StreamCoordinatorClient
    public Cancellable addListener(Id.Stream stream, StreamPropertyListener streamPropertyListener) {
        return this.propertyStore.addChangeListener(stream.toString(), new StreamPropertyChangeListener(streamPropertyListener));
    }

    protected final void startUp() throws Exception {
        this.propertyStore = createPropertyStore(new Codec<CoordinatorStreamProperties>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.1
            public byte[] encode(CoordinatorStreamProperties coordinatorStreamProperties) throws IOException {
                return Bytes.toBytes(AbstractStreamCoordinatorClient.GSON.toJson(coordinatorStreamProperties));
            }

            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public CoordinatorStreamProperties m12decode(byte[] bArr) throws IOException {
                return (CoordinatorStreamProperties) AbstractStreamCoordinatorClient.GSON.fromJson(Bytes.toString(bArr), CoordinatorStreamProperties.class);
            }
        });
        try {
            doStartUp();
        } catch (Exception e) {
            this.propertyStore.close();
            throw e;
        }
    }

    protected final void shutDown() throws Exception {
        this.propertyStore.close();
        doShutDown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public <T> T firstNotNull(@Nullable T t, @Nullable T t2) {
        return t != null ? t : t2;
    }

    private ListenableFuture<CoordinatorStreamProperties> updateProperties(Id.Stream stream, final CoordinatorStreamProperties coordinatorStreamProperties) {
        return this.propertyStore.update(stream.toString(), new SyncPropertyUpdater<CoordinatorStreamProperties>() { // from class: co.cask.cdap.data.stream.AbstractStreamCoordinatorClient.2
            /* JADX INFO: Access modifiers changed from: protected */
            public CoordinatorStreamProperties compute(@Nullable CoordinatorStreamProperties coordinatorStreamProperties2) {
                return coordinatorStreamProperties2 == null ? coordinatorStreamProperties : new CoordinatorStreamProperties((Long) AbstractStreamCoordinatorClient.this.firstNotNull(coordinatorStreamProperties.getTTL(), coordinatorStreamProperties2.getTTL()), (FormatSpecification) AbstractStreamCoordinatorClient.this.firstNotNull(coordinatorStreamProperties.getFormat(), coordinatorStreamProperties2.getFormat()), (Integer) AbstractStreamCoordinatorClient.this.firstNotNull(coordinatorStreamProperties.getNotificationThresholdMB(), coordinatorStreamProperties2.getNotificationThresholdMB()), (Integer) AbstractStreamCoordinatorClient.this.firstNotNull(coordinatorStreamProperties.getGeneration(), coordinatorStreamProperties2.getGeneration()), (String) AbstractStreamCoordinatorClient.this.firstNotNull(coordinatorStreamProperties.getDescription(), coordinatorStreamProperties2.getDescription()));
            }
        });
    }
}
