package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.utils.OSDetector;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.notifications.feeds.NotificationFeed;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.CharStreams;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/AbstractStreamFileAdmin.class */
public abstract class AbstractStreamFileAdmin implements StreamAdmin {
    public static final String CONFIG_FILE_NAME = "config.json";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamFileAdmin.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final Location streamBaseLocation;
    private final StreamCoordinatorClient streamCoordinatorClient;
    private final CConfiguration cConf;
    private final StreamConsumerStateStoreFactory stateStoreFactory;
    private final NotificationFeedManager notificationFeedManager;
    private final StreamAdmin oldStreamAdmin;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamFileAdmin(LocationFactory locationFactory, CConfiguration cConfiguration, StreamCoordinatorClient streamCoordinatorClient, StreamConsumerStateStoreFactory streamConsumerStateStoreFactory, NotificationFeedManager notificationFeedManager, StreamAdmin streamAdmin) {
        this.cConf = cConfiguration;
        this.notificationFeedManager = notificationFeedManager;
        this.streamBaseLocation = locationFactory.create(cConfiguration.get("stream.base.dir"));
        this.streamCoordinatorClient = streamCoordinatorClient;
        this.stateStoreFactory = streamConsumerStateStoreFactory;
        this.oldStreamAdmin = streamAdmin;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void dropAll() throws Exception {
        List<Location> of;
        try {
            this.oldStreamAdmin.dropAll();
        } catch (Exception e) {
            LOG.error("Failed to to truncate old stream.", e);
        }
        try {
            of = this.streamBaseLocation.list();
        } catch (FileNotFoundException e2) {
            of = ImmutableList.of();
        }
        for (Location location : of) {
            try {
                StreamConfig loadConfig = loadConfig(location);
                this.streamCoordinatorClient.nextGeneration(loadConfig, StreamUtils.getGeneration(loadConfig)).get();
            } catch (Exception e3) {
                LOG.error("Failed to truncate stream {}", location.getName(), e3);
            }
        }
        this.stateStoreFactory.dropAll();
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureInstances(QueueName queueName, long j, int i) throws Exception {
        Preconditions.checkArgument(queueName.isStream(), "%s is not a stream.", new Object[]{queueName});
        Preconditions.checkArgument(i > 0, "Number of consumer instances must be > 0.");
        LOG.info("Configure instances: {} {}", Long.valueOf(j), Integer.valueOf(i));
        StreamConsumerStateStore create = this.stateStoreFactory.create(StreamUtils.ensureExists(this, queueName.getSimpleName()));
        try {
            HashSet newHashSet = Sets.newHashSet();
            create.getByGroup(j, newHashSet);
            HashSet newHashSet2 = Sets.newHashSet();
            HashSet newHashSet3 = Sets.newHashSet();
            mutateStates(j, i, newHashSet, newHashSet2, newHashSet3);
            if (!newHashSet2.isEmpty()) {
                create.save(newHashSet2);
                LOG.info("Configure instances new states: {} {} {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), newHashSet2});
            }
            if (!newHashSet3.isEmpty()) {
                create.remove(newHashSet3);
                LOG.info("Configure instances remove states: {} {} {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), newHashSet3});
            }
            if (this.oldStreamAdmin.exists(queueName.toURI().toString())) {
                this.oldStreamAdmin.configureInstances(queueName, j, i);
            }
        } finally {
            create.close();
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureGroups(QueueName queueName, Map<Long, Integer> map) throws Exception {
        Preconditions.checkArgument(queueName.isStream(), "%s is not a stream.", new Object[]{queueName});
        Preconditions.checkArgument(!map.isEmpty(), "Consumer group information must not be empty.");
        LOG.info("Configure groups for {}: {}", queueName, map);
        StreamConsumerStateStore create = this.stateStoreFactory.create(StreamUtils.ensureExists(this, queueName.getSimpleName()));
        try {
            HashSet<StreamConsumerState> newHashSet = Sets.newHashSet();
            create.getAll(newHashSet);
            HashSet newHashSet2 = Sets.newHashSet();
            for (StreamConsumerState streamConsumerState : newHashSet) {
                if (!map.containsKey(Long.valueOf(streamConsumerState.getGroupId()))) {
                    newHashSet2.add(streamConsumerState);
                }
            }
            HashSet newHashSet3 = Sets.newHashSet();
            for (Map.Entry<Long, Integer> entry : map.entrySet()) {
                final long longValue = entry.getKey().longValue();
                mutateStates(longValue, entry.getValue().intValue(), Sets.filter(newHashSet, new Predicate<StreamConsumerState>() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileAdmin.1
                    public boolean apply(StreamConsumerState streamConsumerState2) {
                        return streamConsumerState2.getGroupId() == longValue;
                    }
                }), newHashSet3, newHashSet2);
            }
            if (!newHashSet3.isEmpty()) {
                create.save(newHashSet3);
                LOG.info("Configure groups new states: {} {}", map, newHashSet3);
            }
            if (!newHashSet2.isEmpty()) {
                create.remove(newHashSet2);
                LOG.info("Configure groups remove states: {} {}", map, newHashSet2);
            }
            if (this.oldStreamAdmin.exists(queueName.toURI().toString())) {
                this.oldStreamAdmin.configureGroups(queueName, map);
            }
        } finally {
            create.close();
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void upgrade() throws Exception {
        this.oldStreamAdmin.upgrade();
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public StreamConfig getConfig(String str) throws IOException {
        Location append = this.streamBaseLocation.append(str);
        Preconditions.checkArgument(append.isDirectory(), "Stream '%s' does not exist.", new Object[]{str});
        return loadConfig(append);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void updateConfig(StreamConfig streamConfig) throws IOException {
        Location location = streamConfig.getLocation();
        Preconditions.checkArgument(location.isDirectory(), "Stream '%s' does not exist.", new Object[]{streamConfig.getName()});
        StreamConfig loadConfig = loadConfig(location);
        Preconditions.checkArgument(isValidConfigUpdate(loadConfig, streamConfig), "Configuration update for stream '%s' was not valid (can only update ttl, format or threshold)", new Object[]{streamConfig.getName()});
        if (loadConfig.getTTL() != streamConfig.getTTL()) {
            this.streamCoordinatorClient.changeTTL(loadConfig, streamConfig.getTTL());
        }
        if (loadConfig.getNotificationThresholdMB() != streamConfig.getNotificationThresholdMB()) {
            this.streamCoordinatorClient.changeThreshold(loadConfig, streamConfig.getNotificationThresholdMB().intValue());
        }
        if (loadConfig.getFormat().equals(streamConfig.getFormat())) {
            return;
        }
        saveConfig(streamConfig);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public boolean exists(String str) throws Exception {
        try {
            if (!this.streamBaseLocation.append(str).append(CONFIG_FILE_NAME).exists()) {
                if (!this.oldStreamAdmin.exists(QueueName.fromStream(str).toURI().toString())) {
                    return false;
                }
            }
            return true;
        } catch (IOException e) {
            LOG.error("Exception when check for stream exist.", e);
            return false;
        }
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void create(String str) throws Exception {
        create(str, null);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void create(String str, @Nullable Properties properties) throws Exception {
        Location append = this.streamBaseLocation.append(str);
        Locations.mkdirsIfNotExists(append);
        if (this.streamBaseLocation.append(str).append(CONFIG_FILE_NAME).createNew()) {
            Properties properties2 = properties == null ? new Properties() : properties;
            StreamConfig streamConfig = new StreamConfig(str, Long.parseLong(properties2.getProperty("stream.partition.duration", this.cConf.get("stream.partition.duration"))), Long.parseLong(properties2.getProperty("stream.index.interval", this.cConf.get("stream.index.interval"))), Long.parseLong(properties2.getProperty("stream.event.ttl", this.cConf.get("stream.event.ttl"))), append, null, Integer.valueOf(Integer.parseInt(properties2.getProperty("stream.notification.threshold", this.cConf.get("stream.notification.threshold")))));
            saveConfig(streamConfig);
            createStreamFeeds(streamConfig);
            this.streamCoordinatorClient.streamCreated(str);
        }
    }

    private void createStreamFeeds(StreamConfig streamConfig) {
        try {
            this.notificationFeedManager.createFeed(new NotificationFeed.Builder().setNamespace("default").setCategory(QueueConstants.STREAM_TABLE_PREFIX).setName(String.format("%sSize", streamConfig.getName())).setDescription(String.format("Size updates feed for Stream %s every %dMB", streamConfig.getName(), streamConfig.getNotificationThresholdMB())).build());
        } catch (NotificationFeedException e) {
            LOG.error("Cannot create feed for Stream {}", streamConfig.getName(), e);
        }
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void truncate(String str) throws Exception {
        String uri = QueueName.fromStream(str).toURI().toString();
        if (this.oldStreamAdmin.exists(uri)) {
            this.oldStreamAdmin.truncate(uri);
        }
        StreamConfig config = getConfig(str);
        this.streamCoordinatorClient.nextGeneration(config, StreamUtils.getGeneration(config)).get();
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void drop(String str) throws Exception {
        truncate(str);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void upgrade(String str, Properties properties) throws Exception {
        String uri = QueueName.fromStream(str).toURI().toString();
        if (this.oldStreamAdmin.exists(uri)) {
            this.oldStreamAdmin.upgrade(uri, properties);
        }
    }

    private void saveConfig(StreamConfig streamConfig) throws IOException {
        Location append = this.streamBaseLocation.append(streamConfig.getName()).append(CONFIG_FILE_NAME);
        Location tempFile = append.getTempFile((String) null);
        CharStreams.write(GSON.toJson(streamConfig), CharStreams.newWriterSupplier(Locations.newOutputSupplier(tempFile), Charsets.UTF_8));
        try {
            if (OSDetector.isWindows()) {
                append.delete();
            }
            tempFile.renameTo(this.streamBaseLocation.append(streamConfig.getName()).append(CONFIG_FILE_NAME));
            if (tempFile.exists()) {
                tempFile.delete();
            }
        } catch (Throwable th) {
            if (tempFile.exists()) {
                tempFile.delete();
            }
            throw th;
        }
    }

    private StreamConfig loadConfig(Location location) throws IOException {
        StreamConfig streamConfig = (StreamConfig) GSON.fromJson(CharStreams.toString(CharStreams.newReaderSupplier(Locations.newInputSupplier(location.append(CONFIG_FILE_NAME)), Charsets.UTF_8)), StreamConfig.class);
        Integer notificationThresholdMB = streamConfig.getNotificationThresholdMB();
        if (notificationThresholdMB == null) {
            notificationThresholdMB = Integer.valueOf(this.cConf.getInt("stream.notification.threshold"));
        }
        return new StreamConfig(location.getName(), streamConfig.getPartitionDuration(), streamConfig.getIndexInterval(), streamConfig.getTTL(), location, streamConfig.getFormat(), notificationThresholdMB);
    }

    private boolean isValidConfigUpdate(StreamConfig streamConfig, StreamConfig streamConfig2) {
        return streamConfig.getIndexInterval() == streamConfig2.getIndexInterval() && streamConfig.getPartitionDuration() == streamConfig2.getPartitionDuration();
    }

    private void mutateStates(long j, int i, Set<StreamConsumerState> set, Set<StreamConsumerState> set2, Set<StreamConsumerState> set3) {
        int size = set.size();
        if (size == i) {
            return;
        }
        TreeMap newTreeMap = Maps.newTreeMap(Locations.LOCATION_COMPARATOR);
        Iterator<StreamConsumerState> it = set.iterator();
        while (it.hasNext()) {
            for (StreamFileOffset streamFileOffset : it.next().getState()) {
                StreamFileOffset streamFileOffset2 = (StreamFileOffset) newTreeMap.get(streamFileOffset.getEventLocation());
                if (streamFileOffset2 == null || streamFileOffset.getOffset() < streamFileOffset2.getOffset()) {
                    newTreeMap.put(streamFileOffset.getEventLocation(), new StreamFileOffset(streamFileOffset));
                }
            }
        }
        Collection values = newTreeMap.values();
        for (StreamConsumerState streamConsumerState : set) {
            if (streamConsumerState.getInstanceId() < i) {
                set2.add(new StreamConsumerState(j, streamConsumerState.getInstanceId(), values));
            } else {
                set3.add(streamConsumerState);
            }
        }
        for (int i2 = size; i2 < i; i2++) {
            set2.add(new StreamConsumerState(j, i2, values));
        }
    }
}
