package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.StreamNotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.utils.OSDetector;
import co.cask.cdap.data.stream.CoordinatorStreamProperties;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data.stream.service.StreamMetaStore;
import co.cask.cdap.data.view.ViewAdmin;
import co.cask.cdap.data2.audit.AuditPublisher;
import co.cask.cdap.data2.audit.AuditPublishers;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.data2.metadata.system.StreamSystemMetadataWriter;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.data2.registry.RuntimeUsageRegistry;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.explore.client.ExploreFacade;
import co.cask.cdap.explore.utils.ExploreTableNaming;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.proto.ViewSpecification;
import co.cask.cdap.proto.audit.AuditPayload;
import co.cask.cdap.proto.audit.AuditType;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.KerberosPrincipalId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.id.StreamViewId;
import co.cask.cdap.proto.notification.NotificationFeedInfo;
import co.cask.cdap.security.impersonation.Impersonator;
import co.cask.cdap.security.impersonation.OwnerAdmin;
import co.cask.cdap.security.impersonation.SecurityUtil;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.CharStreams;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/FileStreamAdmin.class */
public class FileStreamAdmin implements StreamAdmin {
    private static final String CONFIG_FILE_NAME = "config.json";
    private static final Logger LOG = LoggerFactory.getLogger(FileStreamAdmin.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final NamespacedLocationFactory namespacedLocationFactory;
    private final StreamCoordinatorClient streamCoordinatorClient;
    private final CConfiguration cConf;
    private final StreamConsumerStateStoreFactory stateStoreFactory;
    private final NotificationFeedManager notificationFeedManager;
    private final String streamBaseDirPath;
    private final RuntimeUsageRegistry runtimeUsageRegistry;
    private final LineageWriter lineageWriter;
    private final StreamMetaStore streamMetaStore;
    private final OwnerAdmin ownerAdmin;
    private final ExploreTableNaming tableNaming;
    private final ViewAdmin viewAdmin;
    private final MetadataStore metadataStore;
    private final Impersonator impersonator;
    private ExploreFacade exploreFacade;
    private AuditPublisher auditPublisher;

    @Inject
    public FileStreamAdmin(NamespacedLocationFactory namespacedLocationFactory, CConfiguration cConfiguration, StreamCoordinatorClient streamCoordinatorClient, StreamConsumerStateStoreFactory streamConsumerStateStoreFactory, NotificationFeedManager notificationFeedManager, RuntimeUsageRegistry runtimeUsageRegistry, LineageWriter lineageWriter, StreamMetaStore streamMetaStore, OwnerAdmin ownerAdmin, ExploreTableNaming exploreTableNaming, MetadataStore metadataStore, ViewAdmin viewAdmin, Impersonator impersonator) {
        this.namespacedLocationFactory = namespacedLocationFactory;
        this.cConf = cConfiguration;
        this.notificationFeedManager = notificationFeedManager;
        this.streamBaseDirPath = cConfiguration.get("stream.base.dir");
        this.streamCoordinatorClient = streamCoordinatorClient;
        this.stateStoreFactory = streamConsumerStateStoreFactory;
        this.runtimeUsageRegistry = runtimeUsageRegistry;
        this.lineageWriter = lineageWriter;
        this.streamMetaStore = streamMetaStore;
        this.ownerAdmin = ownerAdmin;
        this.tableNaming = exploreTableNaming;
        this.metadataStore = metadataStore;
        this.viewAdmin = viewAdmin;
        this.impersonator = impersonator;
    }

    @Inject(optional = true)
    public void setExploreFacade(ExploreFacade exploreFacade) {
        this.exploreFacade = exploreFacade;
    }

    @Inject(optional = true)
    public void setAuditPublisher(AuditPublisher auditPublisher) {
        this.auditPublisher = auditPublisher;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void dropAllInNamespace(final NamespaceId namespaceId) throws Exception {
        for (Location location : (Iterable) this.impersonator.doAs(namespaceId, new Callable<Iterable<Location>>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Iterable<Location> call() throws Exception {
                try {
                    return StreamUtils.listAllStreams(FileStreamAdmin.this.getStreamBaseLocation(namespaceId));
                } catch (FileNotFoundException e) {
                    return ImmutableList.of();
                }
            }
        })) {
            doDrop(namespaceId.stream(StreamUtils.getStreamNameFromLocation(location)), location);
        }
        this.impersonator.doAs(namespaceId, new Callable<Void>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                FileStreamAdmin.this.stateStoreFactory.dropAllInNamespace(namespaceId);
                return null;
            }
        });
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureInstances(StreamId streamId, long j, int i) throws Exception {
        Preconditions.checkArgument(i > 0, "Number of consumer instances must be > 0.");
        LOG.info("Configure instances: {} {}", Long.valueOf(j), Integer.valueOf(i));
        StreamConsumerStateStore create = this.stateStoreFactory.create(StreamUtils.ensureExists(this, streamId));
        Throwable th = null;
        try {
            HashSet newHashSet = Sets.newHashSet();
            create.getByGroup(j, newHashSet);
            HashSet newHashSet2 = Sets.newHashSet();
            HashSet newHashSet3 = Sets.newHashSet();
            mutateStates(j, i, newHashSet, newHashSet2, newHashSet3);
            if (!newHashSet2.isEmpty()) {
                create.save(newHashSet2);
                LOG.info("Configure instances new states: {} {} {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), newHashSet2});
            }
            if (!newHashSet3.isEmpty()) {
                create.remove(newHashSet3);
                LOG.info("Configure instances remove states: {} {} {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), newHashSet3});
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureGroups(StreamId streamId, Map<Long, Integer> map) throws Exception {
        Preconditions.checkArgument(!map.isEmpty(), "Consumer group information must not be empty.");
        LOG.info("Configure groups for {}: {}", streamId, map);
        StreamConsumerStateStore create = this.stateStoreFactory.create(StreamUtils.ensureExists(this, streamId));
        Throwable th = null;
        try {
            try {
                HashSet<StreamConsumerState> newHashSet = Sets.newHashSet();
                create.getAll(newHashSet);
                HashSet newHashSet2 = Sets.newHashSet();
                for (StreamConsumerState streamConsumerState : newHashSet) {
                    if (!map.containsKey(Long.valueOf(streamConsumerState.getGroupId()))) {
                        newHashSet2.add(streamConsumerState);
                    }
                }
                HashSet newHashSet3 = Sets.newHashSet();
                for (Map.Entry<Long, Integer> entry : map.entrySet()) {
                    final long longValue = entry.getKey().longValue();
                    mutateStates(longValue, entry.getValue().intValue(), Sets.filter(newHashSet, new Predicate<StreamConsumerState>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.3
                        public boolean apply(StreamConsumerState streamConsumerState2) {
                            return streamConsumerState2.getGroupId() == longValue;
                        }
                    }), newHashSet3, newHashSet2);
                }
                if (!newHashSet3.isEmpty()) {
                    create.save(newHashSet3);
                    LOG.info("Configure groups new states: {} {}", map, newHashSet3);
                }
                if (!newHashSet2.isEmpty()) {
                    create.remove(newHashSet2);
                    LOG.info("Configure groups remove states: {} {}", map, newHashSet2);
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void upgrade() throws Exception {
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception {
        return this.streamMetaStore.listStreams(namespaceId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public StreamConfig getConfig(final StreamId streamId) throws IOException {
        try {
            return (StreamConfig) this.impersonator.doAs(streamId, new Callable<StreamConfig>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public StreamConfig call() throws IOException {
                    Location configLocation = FileStreamAdmin.this.getConfigLocation(streamId);
                    if (!configLocation.exists()) {
                        throw new FileNotFoundException(String.format("Configuration file %s for stream '%s' does not exist.", configLocation.toURI().getPath(), streamId));
                    }
                    StreamConfig streamConfig = (StreamConfig) FileStreamAdmin.GSON.fromJson(CharStreams.toString(CharStreams.newReaderSupplier(Locations.newInputSupplier(configLocation), Charsets.UTF_8)), StreamConfig.class);
                    int notificationThresholdMB = streamConfig.getNotificationThresholdMB();
                    if (notificationThresholdMB <= 0) {
                        notificationThresholdMB = FileStreamAdmin.this.cConf.getInt("stream.notification.threshold");
                    }
                    return new StreamConfig(streamId, streamConfig.getPartitionDuration(), streamConfig.getIndexInterval(), streamConfig.getTTL(), FileStreamAdmin.this.getStreamLocation(streamId), streamConfig.getFormat(), notificationThresholdMB);
                }
            });
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw new IOException(e);
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public StreamProperties getProperties(StreamId streamId) throws Exception {
        String ownerPrincipal = this.ownerAdmin.getOwnerPrincipal(streamId);
        StreamConfig config = getConfig(streamId);
        return new StreamProperties(Long.valueOf(config.getTTL()), config.getFormat(), Integer.valueOf(config.getNotificationThresholdMB()), this.streamMetaStore.getStream(streamId).getDescription(), ownerPrincipal);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void updateConfig(final StreamId streamId, final StreamProperties streamProperties) throws Exception {
        Preconditions.checkArgument(((Location) this.impersonator.doAs(streamId.getParent(), new Callable<Location>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Location call() throws Exception {
                return FileStreamAdmin.this.getStreamLocation(streamId);
            }
        })).isDirectory(), "Stream '%s' does not exist.", new Object[]{streamId});
        SecurityUtil.verifyOwnerPrincipal(streamId, streamProperties.getOwnerPrincipal(), this.ownerAdmin);
        this.streamCoordinatorClient.updateProperties(streamId, new Callable<CoordinatorStreamProperties>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public CoordinatorStreamProperties call() throws Exception {
                StreamProperties updateProperties = FileStreamAdmin.this.updateProperties(streamId, streamProperties);
                FormatSpecification format = streamProperties.getFormat();
                if (format != null && !Objects.equals(updateProperties.getFormat().getSchema(), format.getSchema())) {
                    FileStreamAdmin.this.alterExploreStream(streamId, false, null);
                    FileStreamAdmin.this.alterExploreStream(streamId, true, format);
                }
                FileStreamAdmin.this.publishAudit(streamId, AuditType.UPDATE);
                return new CoordinatorStreamProperties(streamProperties.getTTL(), streamProperties.getFormat(), streamProperties.getNotificationThresholdMB(), null, streamProperties.getDescription(), streamProperties.getOwnerPrincipal());
            }
        });
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean exists(final StreamId streamId) throws Exception {
        try {
            if (this.streamMetaStore.streamExists(streamId)) {
                return ((Boolean) this.impersonator.doAs(streamId, new Callable<Boolean>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.7
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Boolean call() throws Exception {
                        return Boolean.valueOf(FileStreamAdmin.this.getConfigLocation(streamId).exists());
                    }
                })).booleanValue();
            }
            return false;
        } catch (IOException e) {
            LOG.error("Exception when check for stream exist.", e);
            return false;
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    @Nullable
    public StreamConfig create(StreamId streamId) throws Exception {
        return create(streamId, new Properties());
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    @Nullable
    public StreamConfig create(final StreamId streamId, @Nullable Properties properties) throws Exception {
        Properties properties2 = properties == null ? new Properties() : properties;
        String property = properties2.containsKey("principal") ? properties2.getProperty("principal") : null;
        if (exists(streamId)) {
            SecurityUtil.verifyOwnerPrincipal(streamId, property, this.ownerAdmin);
            return null;
        }
        if (property != null) {
            this.ownerAdmin.add(streamId, new KerberosPrincipalId(property));
        }
        try {
            return createStream(streamId, properties2, (Location) this.impersonator.doAs(streamId, new Callable<Location>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.8
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Location call() throws Exception {
                    FileStreamAdmin.this.assertNamespaceHomeExists(streamId.getParent());
                    Location streamLocation = FileStreamAdmin.this.getStreamLocation(streamId);
                    Locations.mkdirsIfNotExists(streamLocation);
                    return streamLocation;
                }
            }));
        } catch (Exception e) {
            this.ownerAdmin.delete(streamId);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertNamespaceHomeExists(NamespaceId namespaceId) throws IOException {
        Location parent = Locations.getParent(getStreamBaseLocation(namespaceId));
        Preconditions.checkArgument(parent != null && parent.exists(), "Home directory %s for namespace %s not found", new Object[]{parent, namespaceId});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createStreamFeeds(StreamConfig streamConfig) {
        try {
            this.notificationFeedManager.createFeed(new NotificationFeedInfo(streamConfig.getStreamId().getNamespace(), "stream", String.format("%sSize", streamConfig.getStreamId().getEntityName()), String.format("Size updates feed for Stream %s every %dMB", streamConfig.getStreamId(), Integer.valueOf(streamConfig.getNotificationThresholdMB()))));
        } catch (NotificationFeedException e) {
            LOG.error("Cannot create feed for Stream {}", streamConfig.getStreamId(), e);
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void truncate(final StreamId streamId) throws Exception {
        this.impersonator.doAs(streamId, new Callable<Void>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                FileStreamAdmin.this.doTruncate(streamId, FileStreamAdmin.this.getStreamLocation(streamId));
                return null;
            }
        });
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void drop(final StreamId streamId) throws Exception {
        doDrop(streamId, (Location) this.impersonator.doAs(streamId, new Callable<Location>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.10
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Location call() throws Exception {
                return FileStreamAdmin.this.getStreamLocation(streamId);
            }
        }));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean createOrUpdateView(final StreamViewId streamViewId, final ViewSpecification viewSpecification) throws Exception {
        final StreamId parent = streamViewId.getParent();
        return ((Boolean) this.streamCoordinatorClient.exclusiveAction(parent, new Callable<Boolean>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.11
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                if (FileStreamAdmin.this.exists(parent)) {
                    return Boolean.valueOf(FileStreamAdmin.this.viewAdmin.createOrUpdate(streamViewId, viewSpecification));
                }
                throw new NotFoundException(parent);
            }
        })).booleanValue();
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void deleteView(final StreamViewId streamViewId) throws Exception {
        final StreamId parent = streamViewId.getParent();
        this.streamCoordinatorClient.exclusiveAction(parent, new Callable<Void>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.12
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (!FileStreamAdmin.this.exists(parent)) {
                    throw new StreamNotFoundException(parent);
                }
                FileStreamAdmin.this.viewAdmin.delete(streamViewId);
                return null;
            }
        });
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public List<StreamViewId> listViews(StreamId streamId) throws Exception {
        if (exists(streamId)) {
            return this.viewAdmin.list(streamId);
        }
        throw new StreamNotFoundException(streamId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public ViewSpecification getView(StreamViewId streamViewId) throws Exception {
        StreamId parent = streamViewId.getParent();
        if (exists(parent)) {
            return this.viewAdmin.get(streamViewId);
        }
        throw new StreamNotFoundException(parent);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean viewExists(StreamViewId streamViewId) throws Exception {
        StreamId parent = streamViewId.getParent();
        if (exists(parent)) {
            return this.viewAdmin.exists(streamViewId);
        }
        throw new StreamNotFoundException(parent);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void register(Iterable<? extends EntityId> iterable, StreamId streamId) {
        this.runtimeUsageRegistry.registerAll(iterable, streamId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType) {
        this.lineageWriter.addAccess(programRunId, streamId, accessType);
        AuditPublishers.publishAccess(this.auditPublisher, streamId, accessType, programRunId);
    }

    @Nullable
    private StreamConfig createStream(final StreamId streamId, final Properties properties, final Location location) throws Exception {
        try {
            return this.streamCoordinatorClient.createStream(streamId, new Callable<StreamConfig>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.13
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public StreamConfig call() throws Exception {
                    if (FileStreamAdmin.this.exists(streamId)) {
                        return null;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    long parseLong = Long.parseLong(properties.getProperty("stream.partition.duration", FileStreamAdmin.this.cConf.get("stream.partition.duration")));
                    long parseLong2 = Long.parseLong(properties.getProperty("stream.index.interval", FileStreamAdmin.this.cConf.get("stream.index.interval")));
                    long parseLong3 = Long.parseLong(properties.getProperty("stream.event.ttl", FileStreamAdmin.this.cConf.get("stream.event.ttl")));
                    int parseInt = Integer.parseInt(properties.getProperty("stream.notification.threshold", FileStreamAdmin.this.cConf.get("stream.notification.threshold")));
                    String property = properties.getProperty("stream.description");
                    FormatSpecification formatSpecification = null;
                    if (properties.containsKey("stream.format.specification")) {
                        formatSpecification = (FormatSpecification) FileStreamAdmin.GSON.fromJson(properties.getProperty("stream.format.specification"), FormatSpecification.class);
                    }
                    final StreamConfig streamConfig = new StreamConfig(streamId, parseLong, parseLong2, parseLong3, location, formatSpecification, parseInt);
                    FileStreamAdmin.this.impersonator.doAs(streamId, new Callable<Void>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.13.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            FileStreamAdmin.this.writeConfig(streamConfig);
                            return null;
                        }
                    });
                    FileStreamAdmin.this.createStreamFeeds(streamConfig);
                    FileStreamAdmin.this.alterExploreStream(streamId, true, streamConfig.getFormat());
                    FileStreamAdmin.this.streamMetaStore.addStream(streamId, property);
                    FileStreamAdmin.this.publishAudit(streamId, AuditType.CREATE);
                    new StreamSystemMetadataWriter(FileStreamAdmin.this.metadataStore, streamId, streamConfig, currentTimeMillis, property).write();
                    return streamConfig;
                }
            });
        } catch (Exception e) {
            location.delete(true);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Location getConfigLocation(StreamId streamId) throws IOException {
        return getStreamLocation(streamId).append(CONFIG_FILE_NAME);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Location getStreamLocation(StreamId streamId) throws IOException {
        return getStreamBaseLocation(streamId.getParent()).append(streamId.getEntityName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Location getStreamBaseLocation(NamespaceId namespaceId) throws IOException {
        return this.namespacedLocationFactory.get(namespaceId).append(this.streamBaseDirPath);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doTruncate(final StreamId streamId, final Location location) throws Exception {
        this.streamCoordinatorClient.updateProperties(streamId, new Callable<CoordinatorStreamProperties>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.14
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public CoordinatorStreamProperties call() throws Exception {
                int generation = StreamUtils.getGeneration(location) + 1;
                Locations.mkdirsIfNotExists(StreamUtils.createGenerationLocation(location, generation));
                FileStreamAdmin.this.publishAudit(streamId, AuditType.TRUNCATE);
                return new CoordinatorStreamProperties(null, null, null, Integer.valueOf(generation), null, null);
            }
        });
    }

    private void doDrop(final StreamId streamId, final Location location) throws Exception {
        this.streamCoordinatorClient.deleteStream(streamId, new Runnable() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.15
            @Override // java.lang.Runnable
            public void run() {
                try {
                    final Location location2 = (Location) FileStreamAdmin.this.impersonator.doAs(streamId, new Callable<Location>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.15.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Location call() throws Exception {
                            Location configLocation = FileStreamAdmin.this.getConfigLocation(streamId);
                            if (configLocation.exists()) {
                                return configLocation;
                            }
                            return null;
                        }
                    });
                    if (location2 == null) {
                        return;
                    }
                    FileStreamAdmin.this.alterExploreStream(streamId.getParent().stream(StreamUtils.getStreamNameFromLocation(location)), false, null);
                    Iterator<StreamViewId> it = FileStreamAdmin.this.viewAdmin.list(streamId).iterator();
                    while (it.hasNext()) {
                        FileStreamAdmin.this.viewAdmin.delete(it.next());
                    }
                    FileStreamAdmin.this.impersonator.doAs(streamId, new Callable<Void>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.15.2
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            if (!location2.delete()) {
                                FileStreamAdmin.LOG.debug("Could not delete stream config location {}", location);
                            }
                            Location deletedLocation = StreamUtils.getDeletedLocation(FileStreamAdmin.this.getStreamBaseLocation(streamId.getParent()));
                            Locations.mkdirsIfNotExists(deletedLocation);
                            location.renameTo(deletedLocation.append(streamId.getEntityName() + System.currentTimeMillis()));
                            return null;
                        }
                    });
                    FileStreamAdmin.this.streamMetaStore.removeStream(streamId);
                    FileStreamAdmin.this.ownerAdmin.delete(streamId);
                    FileStreamAdmin.this.metadataStore.removeMetadata(streamId);
                    FileStreamAdmin.this.publishAudit(streamId, AuditType.DELETE);
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StreamProperties updateProperties(StreamId streamId, StreamProperties streamProperties) throws Exception {
        StreamConfig config = getConfig(streamId);
        StreamConfig.Builder builder = StreamConfig.builder(config);
        if (streamProperties.getTTL() != null) {
            builder.setTTL(streamProperties.getTTL().longValue());
        }
        if (streamProperties.getFormat() != null) {
            builder.setFormatSpec(streamProperties.getFormat());
        }
        if (streamProperties.getNotificationThresholdMB() != null) {
            builder.setNotificationThreshold(streamProperties.getNotificationThresholdMB().intValue());
        }
        String description = streamProperties.getDescription();
        if (description != null) {
            this.streamMetaStore.addStream(streamId, description);
        }
        final StreamConfig build = builder.build();
        this.impersonator.doAs(streamId, new Callable<Void>() { // from class: co.cask.cdap.data2.transaction.stream.FileStreamAdmin.16
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                FileStreamAdmin.this.writeConfig(build);
                return null;
            }
        });
        new StreamSystemMetadataWriter(this.metadataStore, streamId, build, description).write();
        return new StreamProperties(Long.valueOf(config.getTTL()), config.getFormat(), Integer.valueOf(config.getNotificationThresholdMB()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeConfig(StreamConfig streamConfig) throws IOException {
        Location append = streamConfig.getLocation().append(CONFIG_FILE_NAME);
        Location tempFile = append.getTempFile((String) null);
        CharStreams.write(GSON.toJson(streamConfig), CharStreams.newWriterSupplier(Locations.newOutputSupplier(tempFile), Charsets.UTF_8));
        try {
            if (OSDetector.isWindows()) {
                append.delete();
            }
            tempFile.renameTo(getConfigLocation(streamConfig.getStreamId()));
            Locations.deleteQuietly(tempFile);
        } catch (Throwable th) {
            Locations.deleteQuietly(tempFile);
            throw th;
        }
    }

    private void mutateStates(long j, int i, Set<StreamConsumerState> set, Set<StreamConsumerState> set2, Set<StreamConsumerState> set3) {
        int size = set.size();
        if (size == i) {
            return;
        }
        TreeMap newTreeMap = Maps.newTreeMap(Locations.LOCATION_COMPARATOR);
        Iterator<StreamConsumerState> it = set.iterator();
        while (it.hasNext()) {
            for (StreamFileOffset streamFileOffset : it.next().getState()) {
                StreamFileOffset streamFileOffset2 = (StreamFileOffset) newTreeMap.get(streamFileOffset.getEventLocation());
                if (streamFileOffset2 == null || streamFileOffset.getOffset() < streamFileOffset2.getOffset()) {
                    newTreeMap.put(streamFileOffset.getEventLocation(), new StreamFileOffset(streamFileOffset));
                }
            }
        }
        Collection values = newTreeMap.values();
        for (StreamConsumerState streamConsumerState : set) {
            if (streamConsumerState.getInstanceId() < i) {
                set2.add(new StreamConsumerState(j, streamConsumerState.getInstanceId(), values));
            } else {
                set3.add(streamConsumerState);
            }
        }
        for (int i2 = size; i2 < i; i2++) {
            set2.add(new StreamConsumerState(j, i2, values));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void alterExploreStream(StreamId streamId, boolean z, @Nullable FormatSpecification formatSpecification) {
        if (this.cConf.getBoolean("explore.enabled")) {
            Preconditions.checkNotNull(this.exploreFacade, "Explore enabled but no ExploreFacade instance is available");
            try {
                if (z) {
                    this.exploreFacade.enableExploreStream(streamId, this.tableNaming.getTableName(streamId), formatSpecification);
                    LOG.info("Explore enabled on Stream '{}'.", streamId.getStream());
                } else {
                    this.exploreFacade.disableExploreStream(streamId, this.tableNaming.getTableName(streamId));
                }
            } catch (Exception e) {
                LOG.error(String.format("Cannot alter exploration to %s for stream %s: %s", Boolean.valueOf(z), streamId, e.getMessage()), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishAudit(StreamId streamId, AuditType auditType) {
        AuditPublishers.publishAudit(this.auditPublisher, streamId, auditType, AuditPayload.EMPTY_PAYLOAD);
    }
}
