/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;

import java.io.IOException;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;

/**
 * Handles data stream modification requests.
 */
public class MetadataDataStreamsService {
    private static final Logger LOGGER = LogManager.getLogger(MetadataDataStreamsService.class);
    private final ClusterService clusterService;
    private final IndicesService indicesService;
    private final DataStreamGlobalRetentionSettings globalRetentionSettings;
    private final MasterServiceTaskQueue<UpdateLifecycleTask> updateLifecycleTaskQueue;
    private final MasterServiceTaskQueue<SetRolloverOnWriteTask> setRolloverOnWriteTaskQueue;
    private final MasterServiceTaskQueue<UpdateOptionsTask> updateOptionsTaskQueue;
    private final MasterServiceTaskQueue<UpdateSettingsTask> updateSettingsTaskQueue;
    private final MasterServiceTaskQueue<UpdateMappingsTask> updateMappingsTaskQueue;
    private final IndexSettingProviders indexSettingProviders;

    public MetadataDataStreamsService(
        ClusterService clusterService,
        IndicesService indicesService,
        DataStreamGlobalRetentionSettings globalRetentionSettings,
        IndexSettingProviders indexSettingProviders
    ) {
        this.clusterService = clusterService;
        this.indicesService = indicesService;
        this.globalRetentionSettings = globalRetentionSettings;
        this.indexSettingProviders = indexSettingProviders;
        ClusterStateTaskExecutor<UpdateLifecycleTask> updateLifecycleExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {

            @Override
            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
                UpdateLifecycleTask modifyLifecycleTask,
                ClusterState clusterState
            ) {
                return new Tuple<>(
                    ClusterState.builder(clusterState)
                        .putProjectMetadata(
                            updateDataLifecycle(
                                clusterState.metadata().getProject(modifyLifecycleTask.getProjectId()),
                                modifyLifecycleTask.getDataStreamNames(),
                                modifyLifecycleTask.getDataLifecycle()
                            )
                        )
                        .build(),
                    modifyLifecycleTask
                );
            }
        };
        // We chose priority high because changing the lifecycle is changing the retention of a backing index, so processing it quickly
        // can either free space when the retention is shortened, or prevent an index to be deleted when the retention is extended.
        this.updateLifecycleTaskQueue = clusterService.createTaskQueue("modify-lifecycle", Priority.HIGH, updateLifecycleExecutor);
        ClusterStateTaskExecutor<SetRolloverOnWriteTask> rolloverOnWriteExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {

            @Override
            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
                SetRolloverOnWriteTask setRolloverOnWriteTask,
                ClusterState clusterState
            ) {
                return new Tuple<>(
                    setRolloverOnWrite(
                        clusterState.projectState(setRolloverOnWriteTask.projectId()),
                        setRolloverOnWriteTask.getDataStreamName(),
                        setRolloverOnWriteTask.rolloverOnWrite(),
                        setRolloverOnWriteTask.targetFailureStore()
                    ),
                    setRolloverOnWriteTask
                );
            }
        };
        this.setRolloverOnWriteTaskQueue = clusterService.createTaskQueue(
            "data-stream-rollover-on-write",
            Priority.NORMAL,
            rolloverOnWriteExecutor
        );
        ClusterStateTaskExecutor<UpdateOptionsTask> updateOptionsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {

            @Override
            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
                UpdateOptionsTask modifyOptionsTask,
                ClusterState clusterState
            ) {
                return new Tuple<>(
                    ClusterState.builder(clusterState)
                        .putProjectMetadata(
                            updateDataStreamOptions(
                                clusterState.metadata().getProject(modifyOptionsTask.projectId),
                                modifyOptionsTask.getDataStreamNames(),
                                modifyOptionsTask.getOptions()
                            )
                        )
                        .build(),
                    modifyOptionsTask
                );
            }
        };
        this.updateOptionsTaskQueue = clusterService.createTaskQueue("modify-data-stream-options", Priority.NORMAL, updateOptionsExecutor);
        ClusterStateTaskExecutor<UpdateSettingsTask> updateSettingsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {

            @Override
            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
                UpdateSettingsTask updateSettingsTask,
                ClusterState clusterState
            ) throws Exception {
                DataStream dataStream = createDataStreamForUpdatedDataStreamSettings(
                    updateSettingsTask.projectId,
                    updateSettingsTask.dataStreamName,
                    updateSettingsTask.settingsOverrides,
                    clusterState
                );
                ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateSettingsTask.projectId);
                ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata);
                projectMetadataBuilder.removeDataStream(updateSettingsTask.dataStreamName);
                projectMetadataBuilder.put(dataStream);
                ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build();
                return new Tuple<>(updatedClusterState, updateSettingsTask);
            }
        };
        this.updateSettingsTaskQueue = clusterService.createTaskQueue(
            "update-data-stream-settings",
            Priority.NORMAL,
            updateSettingsExecutor
        );
        ClusterStateTaskExecutor<UpdateMappingsTask> updateMappingsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {

            @Override
            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
                UpdateMappingsTask updateMappingsTask,
                ClusterState clusterState
            ) throws Exception {
                DataStream dataStream = createDataStreamForUpdatedDataStreamMappings(
                    updateMappingsTask.projectId,
                    updateMappingsTask.dataStreamName,
                    updateMappingsTask.mappingsOverrides,
                    clusterState
                );
                ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateMappingsTask.projectId);
                ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata);
                projectMetadataBuilder.removeDataStream(updateMappingsTask.dataStreamName);
                projectMetadataBuilder.put(dataStream);
                ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build();
                return new Tuple<>(updatedClusterState, updateMappingsTask);
            }
        };
        this.updateMappingsTaskQueue = clusterService.createTaskQueue(
            "update-data-stream-mappings",
            Priority.NORMAL,
            updateMappingsExecutor
        );
    }

    public void modifyDataStream(
        final ProjectId projectId,
        final ModifyDataStreamsAction.Request request,
        final ActionListener<AcknowledgedResponse> listener
    ) {
        if (request.getActions().size() == 0) {
            listener.onResponse(AcknowledgedResponse.TRUE);
        } else {
            submitUnbatchedTask("update-backing-indices", new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
                @Override
                public ClusterState execute(ClusterState currentState) {
                    final var project = modifyDataStream(
                        currentState.metadata().getProject(projectId),
                        request.getActions(),
                        indexMetadata -> {
                            try {
                                return indicesService.createIndexMapperServiceForValidation(indexMetadata);
                            } catch (IOException e) {
                                throw new IllegalStateException(e);
                            }
                        },
                        clusterService.getSettings()
                    );
                    return ClusterState.builder(currentState).putProjectMetadata(project).build();
                }
            });
        }
    }

    /**
     * Submits the task to set the lifecycle to the requested data streams.
     */
    public void setLifecycle(
        final ProjectId projectId,
        final List<String> dataStreamNames,
        DataStreamLifecycle lifecycle,
        TimeValue ackTimeout,
        TimeValue masterTimeout,
        final ActionListener<AcknowledgedResponse> listener
    ) {
        updateLifecycleTaskQueue.submitTask(
            "set-lifecycle",
            new UpdateLifecycleTask(projectId, dataStreamNames, lifecycle, ackTimeout, listener),
            masterTimeout
        );
    }

    /**
     * Submits the task to remove the lifecycle from the requested data streams.
     */
    public void removeLifecycle(
        ProjectId projectId,
        List<String> dataStreamNames,
        TimeValue ackTimeout,
        TimeValue masterTimeout,
        ActionListener<AcknowledgedResponse> listener
    ) {
        updateLifecycleTaskQueue.submitTask(
            "delete-lifecycle",
            new UpdateLifecycleTask(projectId, dataStreamNames, null, ackTimeout, listener),
            masterTimeout
        );
    }

    /**
     * Submits the task to set the provided data stream options to the requested data streams.
     */
    public void setDataStreamOptions(
        final ProjectId projectId,
        final List<String> dataStreamNames,
        DataStreamOptions options,
        TimeValue ackTimeout,
        TimeValue masterTimeout,
        final ActionListener<AcknowledgedResponse> listener
    ) {
        updateOptionsTaskQueue.submitTask(
            "set-data-stream-options",
            new UpdateOptionsTask(projectId, dataStreamNames, options, ackTimeout, listener),
            masterTimeout
        );
    }

    /**
     * Submits the task to remove the data stream options from the requested data streams.
     */
    public void removeDataStreamOptions(
        ProjectId projectId,
        List<String> dataStreamNames,
        TimeValue ackTimeout,
        TimeValue masterTimeout,
        ActionListener<AcknowledgedResponse> listener
    ) {
        updateOptionsTaskQueue.submitTask(
            "delete-data-stream-options",
            new UpdateOptionsTask(projectId, dataStreamNames, null, ackTimeout, listener),
            masterTimeout
        );
    }

    @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
    private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
        clusterService.submitUnbatchedStateUpdateTask(source, task);
    }

    /**
     * Submits the task to signal that the next time this data stream receives a document, it will be rolled over.
     */
    public void setRolloverOnWrite(
        ProjectId projectId,
        String dataStreamName,
        boolean rolloverOnWrite,
        boolean targetFailureStore,
        TimeValue ackTimeout,
        TimeValue masterTimeout,
        ActionListener<AcknowledgedResponse> listener
    ) {
        setRolloverOnWriteTaskQueue.submitTask(
            "set-rollover-on-write",
            new SetRolloverOnWriteTask(projectId, dataStreamName, rolloverOnWrite, targetFailureStore, ackTimeout, listener),
            masterTimeout
        );
    }

    /**
     * Computes the resulting cluster state after applying all requested data stream modifications in order.
     *
     * @param currentProject current project metadata
     * @param actions ordered list of modifications to perform
     * @return resulting cluster state after all modifications have been performed
     */
    static ProjectMetadata modifyDataStream(
        ProjectMetadata currentProject,
        Iterable<DataStreamAction> actions,
        Function<IndexMetadata, MapperService> mapperSupplier,
        Settings nodeSettings
    ) {
        var updatedProject = currentProject;
        for (var action : actions) {
            ProjectMetadata.Builder builder = ProjectMetadata.builder(updatedProject);
            if (action.getType() == DataStreamAction.Type.ADD_BACKING_INDEX) {
                addBackingIndex(
                    updatedProject,
                    builder,
                    mapperSupplier,
                    action.getDataStream(),
                    action.getIndex(),
                    action.isFailureStore(),
                    nodeSettings
                );
            } else if (action.getType() == DataStreamAction.Type.REMOVE_BACKING_INDEX) {
                removeBackingIndex(updatedProject, builder, action.getDataStream(), action.getIndex(), action.isFailureStore());
            } else {
                throw new IllegalStateException("unsupported data stream action type [" + action.getClass().getName() + "]");
            }
            updatedProject = builder.build();
        }

        return updatedProject;
    }

    /**
     * Creates an updated cluster state in which the requested data streams have the data stream lifecycle provided.
     * Visible for testing.
     */
    ProjectMetadata updateDataLifecycle(ProjectMetadata project, List<String> dataStreamNames, @Nullable DataStreamLifecycle lifecycle) {
        ProjectMetadata.Builder builder = ProjectMetadata.builder(project);
        boolean onlyInternalDataStreams = true;
        for (var dataStreamName : dataStreamNames) {
            var dataStream = validateDataStream(project, dataStreamName);
            builder.put(dataStream.copy().setLifecycle(lifecycle).build());
            onlyInternalDataStreams = onlyInternalDataStreams && dataStream.isInternal();
        }
        if (lifecycle != null) {
            // We don't issue any warnings if all data streams are internal data streams
            lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), onlyInternalDataStreams);
        }
        return builder.build();
    }

    /**
     * Creates an updated cluster state in which the requested data streams have the data stream options provided.
     * Visible for testing.
     */
    ProjectMetadata updateDataStreamOptions(
        ProjectMetadata project,
        List<String> dataStreamNames,
        @Nullable DataStreamOptions dataStreamOptions
    ) {
        ProjectMetadata.Builder builder = ProjectMetadata.builder(project);
        boolean onlyInternalDataStreams = true;
        for (var dataStreamName : dataStreamNames) {
            var dataStream = validateDataStream(project, dataStreamName);
            builder.put(dataStream.copy().setDataStreamOptions(dataStreamOptions).build());
            onlyInternalDataStreams = onlyInternalDataStreams && dataStream.isInternal();
        }
        if (dataStreamOptions != null && dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().lifecycle() != null) {
            // We don't issue any warnings if all data streams are internal data streams
            dataStreamOptions.failureStore()
                .lifecycle()
                .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(true), onlyInternalDataStreams);
        }
        return builder.build();
    }

    /**
     * Creates an updated cluster state in which the requested data stream has the flag {@link DataStream#rolloverOnWrite()}
     * set to the value of the parameter rolloverOnWrite
     *
     * @param currentState the initial project state
     * @param dataStreamName the name of the data stream to be updated
     * @param rolloverOnWrite the value of the flag
     * @param targetFailureStore whether this rollover targets the failure store or the backing indices
     * @return the updated cluster state
     */
    public static ClusterState setRolloverOnWrite(
        ProjectState currentState,
        String dataStreamName,
        boolean rolloverOnWrite,
        boolean targetFailureStore
    ) {
        var metadata = currentState.metadata();
        var dataStream = validateDataStream(metadata, dataStreamName);
        var indices = dataStream.getDataStreamIndices(targetFailureStore);
        if (indices.isRolloverOnWrite() == rolloverOnWrite) {
            return currentState.cluster();
        }
        return currentState.updatedState(
            builder -> builder.put(
                dataStream.copy()
                    .setDataStreamIndices(targetFailureStore, indices.copy().setRolloverOnWrite(rolloverOnWrite).build())
                    .build()
            )
        );
    }

    public void updateSettings(
        ProjectId projectId,
        TimeValue masterNodeTimeout,
        TimeValue ackTimeout,
        String dataStreamName,
        Settings settingsOverrides,
        boolean dryRun,
        ActionListener<DataStream> listener
    ) {
        if (dryRun) {
            /*
             * If this is a dry run, we'll do the settings validation and apply the changes to the data stream locally, but we won't run
             * the task that actually updates the cluster state.
             */
            try {
                DataStream updatedDataStream = createDataStreamForUpdatedDataStreamSettings(
                    projectId,
                    dataStreamName,
                    settingsOverrides,
                    clusterService.state()
                );
                listener.onResponse(updatedDataStream);
            } catch (Exception e) {
                listener.onFailure(e);
            }
        } else {
            UpdateSettingsTask updateSettingsTask = new UpdateSettingsTask(
                projectId,
                dataStreamName,
                settingsOverrides,
                clusterService,
                ackTimeout,
                listener
            );
            updateSettingsTaskQueue.submitTask("updating settings on data stream", updateSettingsTask, masterNodeTimeout);
        }
    }

    /*
     * This method validates that the settings won't cause any validation problems with existing templates. If successful, a copy of the
     * data stream is returned with the new settings applied.
     */
    private DataStream createDataStreamForUpdatedDataStreamSettings(
        ProjectId projectId,
        String dataStreamName,
        Settings settingsOverrides,
        ClusterState clusterState
    ) throws Exception {
        ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId);
        Map<String, DataStream> dataStreamMap = projectMetadata.dataStreams();
        DataStream dataStream = dataStreamMap.get(dataStreamName);
        Settings existingDataStreamSettings = dataStream.getSettings();

        Settings.Builder mergedSettingsBuilder = Settings.builder().put(existingDataStreamSettings).put(settingsOverrides);
        /*
         * A null value for a setting override means that we remove it from the data stream, and let the value from the template (if any)
         * be used.
         */
        settingsOverrides.keySet().forEach(key -> {
            if (mergedSettingsBuilder.get(key) == null) {
                mergedSettingsBuilder.remove(key);
            }
        });
        Settings mergedDataStreamSettings = mergedSettingsBuilder.build();

        final ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, projectMetadata);
        Settings templateSettings = MetadataIndexTemplateService.resolveSettings(template, projectMetadata.componentTemplates());
        Settings mergedEffectiveSettings = templateSettings.merge(mergedDataStreamSettings);
        CompressedXContent effectiveMappings = dataStream.getEffectiveMappings(projectMetadata, indicesService);
        MetadataIndexTemplateService.validateTemplate(
            addSettingsFromIndexSettingProviders(dataStreamName, effectiveMappings, projectMetadata, mergedEffectiveSettings),
            effectiveMappings,
            indicesService
        );

        return dataStream.copy().setSettings(mergedDataStreamSettings).build();
    }

    private Settings addSettingsFromIndexSettingProviders(
        String dataStreamName,
        CompressedXContent effectiveMappings,
        ProjectMetadata projectMetadata,
        Settings settings
    ) {
        Settings.Builder additionalSettings = Settings.builder();
        IndexMode indexMode = projectMetadata.dataStreams().get(dataStreamName).getIndexMode();
        Set<String> overrulingSettings = new HashSet<>();
        indexSettingProviders.getIndexSettingProviders().forEach(indexSettingProvider -> {
            Settings.Builder providerSettingsBuilder = Settings.builder();
            indexSettingProvider.provideAdditionalSettings(
                dataStreamName,
                dataStreamName,
                indexMode,
                projectMetadata,
                Instant.now(),
                settings,
                List.of(effectiveMappings),
                IndexVersion.current(),
                providerSettingsBuilder
            );
            Settings providerSettings = providerSettingsBuilder.build();
            if (indexSettingProvider.overrulesTemplateAndRequestSettings()) {
                overrulingSettings.addAll(providerSettings.keySet());
            }
            additionalSettings.put(providerSettings);
        });
        Settings filteredmergedEffectiveSettings = settings;
        if (overrulingSettings.isEmpty() == false) {
            // Filter any conflicting settings from overruling providers, to avoid overwriting their values from templates.
            final Settings.Builder filtered = Settings.builder().put(settings);
            for (String setting : overrulingSettings) {
                filtered.remove(setting);
            }
            filteredmergedEffectiveSettings = filtered.build();
        }
        return additionalSettings.put(filteredmergedEffectiveSettings).build();
    }

    private DataStream createDataStreamForUpdatedDataStreamMappings(
        ProjectId projectId,
        String dataStreamName,
        CompressedXContent mappingsOverrides,
        ClusterState clusterState
    ) throws Exception {
        ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId);
        Map<String, DataStream> dataStreamMap = projectMetadata.dataStreams();
        DataStream dataStream = dataStreamMap.get(dataStreamName);

        final ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, projectMetadata);
        CompressedXContent effectiveMappings = DataStream.getEffectiveMappings(
            projectMetadata,
            template,
            mappingsOverrides,
            dataStream.getWriteIndex(),
            indicesService
        );
        MetadataIndexTemplateService.validateTemplate(
            getEffectiveSettings(projectMetadata, dataStream, mappingsOverrides),
            effectiveMappings,
            indicesService
        );
        return dataStream.copy().setMappings(mappingsOverrides).build();
    }

    /**
     * This method gets the effective settings for the given data stream. The effective settings include the combination of template
     * settings, data stream settings overrides, and the implicit settings provide by IndexSettingsProviders.
     * @param projectMetadata The project metadata
     * @param dataStream The data stream
     * @return The effective settings for the data stream, which are a the combination of template settings, data stream settings overrides,
     * and the implicit settings provide by IndexSettingsProviders
     * @throws IOException
     */
    public Settings getEffectiveSettings(ProjectMetadata projectMetadata, DataStream dataStream) throws IOException {
        return getEffectiveSettings(projectMetadata, dataStream, dataStream.getMappings());
    }

    /**
     * This method gets the effective settings for the given data stream, using the passed-in mappingOverrides rather than the data stream's
     * mapping overrides. This can be used to evaluate the validity of the settings and mappings before applying the mapping overrides to
     * the data stream. The effective settings include the combination of template settings, data stream settings overrides, and the
     * implicit settings provide by IndexSettingsProviders.
     * @param projectMetadata The project metadata
     * @param dataStream The data stream
     * @param mappingOverrides The mapping overrides to be used in place of the mapping overrides on the data stream currently
     * @return The effective settings for the data stream, which are a the combination of template settings, data stream settings overrides,
     * and the implicit settings provide by IndexSettingsProviders
     * @throws IOException
     */
    public Settings getEffectiveSettings(ProjectMetadata projectMetadata, DataStream dataStream, CompressedXContent mappingOverrides)
        throws IOException {
        ComposableIndexTemplate template = lookupTemplateForDataStream(dataStream.getName(), projectMetadata);
        Settings templateSettings = MetadataIndexTemplateService.resolveSettings(template, projectMetadata.componentTemplates());
        CompressedXContent effectiveMappings = DataStream.getEffectiveMappings(
            projectMetadata,
            template,
            mappingOverrides,
            dataStream.getWriteIndex(),
            indicesService
        );
        return addSettingsFromIndexSettingProviders(
            dataStream.getName(),
            effectiveMappings,
            projectMetadata,
            templateSettings.merge(dataStream.getSettings())
        );
    }

    public void updateMappings(
        ProjectId projectId,
        TimeValue masterNodeTimeout,
        TimeValue ackTimeout,
        String dataStreamName,
        CompressedXContent mappingsOverrides,
        boolean dryRun,
        ActionListener<DataStream> listener
    ) {
        if (dryRun) {
            /*
             * If this is a dry run, we'll do the settings validation and apply the changes to the data stream locally, but we won't run
             * the task that actually updates the cluster state.
             */
            try {
                DataStream updatedDataStream = createDataStreamForUpdatedDataStreamMappings(
                    projectId,
                    dataStreamName,
                    mappingsOverrides,
                    clusterService.state()
                );
                listener.onResponse(updatedDataStream);
            } catch (Exception e) {
                listener.onFailure(e);
            }
        } else {
            updateMappingsTaskQueue.submitTask(
                "updating mappings on data stream",
                new UpdateMappingsTask(projectId, dataStreamName, mappingsOverrides, clusterService, ackTimeout, listener),
                masterNodeTimeout
            );
        }
    }

    private static void addBackingIndex(
        ProjectMetadata project,
        ProjectMetadata.Builder builder,
        Function<IndexMetadata, MapperService> mapperSupplier,
        String dataStreamName,
        String indexName,
        boolean failureStore,
        Settings nodeSettings
    ) {
        var dataStream = validateDataStream(project, dataStreamName);
        var index = validateIndex(project, indexName);

        try {
            MetadataMigrateToDataStreamService.prepareBackingIndex(
                builder,
                project.index(index.getWriteIndex()),
                dataStreamName,
                mapperSupplier,
                false,
                failureStore,
                dataStream.isSystem(),
                nodeSettings
            );
        } catch (IOException e) {
            throw new IllegalArgumentException("unable to prepare backing index", e);
        }

        // add index to data stream
        if (failureStore) {
            builder.put(dataStream.addFailureStoreIndex(project, index.getWriteIndex()));
        } else {
            builder.put(dataStream.addBackingIndex(project, index.getWriteIndex()));
        }
    }

    private static void removeBackingIndex(
        ProjectMetadata project,
        ProjectMetadata.Builder builder,
        String dataStreamName,
        String indexName,
        boolean failureStore
    ) {
        boolean indexNotRemoved = true;
        DataStream dataStream = validateDataStream(project, dataStreamName);
        List<Index> targetIndices = failureStore ? dataStream.getFailureIndices() : dataStream.getIndices();
        for (Index backingIndex : targetIndices) {
            if (backingIndex.getName().equals(indexName)) {
                if (failureStore) {
                    builder.put(dataStream.removeFailureStoreIndex(backingIndex));
                } else {
                    builder.put(dataStream.removeBackingIndex(backingIndex));
                }
                indexNotRemoved = false;
                break;
            }
        }

        if (indexNotRemoved) {
            throw new IllegalArgumentException("index [" + indexName + "] not found");
        }

        // un-hide index
        var indexMetadata = builder.get(indexName);
        if (indexMetadata != null) {
            builder.put(
                IndexMetadata.builder(indexMetadata)
                    .settings(Settings.builder().put(indexMetadata.getSettings()).put("index.hidden", "false").build())
                    .settingsVersion(indexMetadata.getSettingsVersion() + 1)
            );
        }
    }

    private static DataStream validateDataStream(ProjectMetadata project, String dataStreamName) {
        IndexAbstraction dataStream = project.getIndicesLookup().get(dataStreamName);
        if (dataStream == null || dataStream.getType() != IndexAbstraction.Type.DATA_STREAM) {
            throw new IllegalArgumentException("data stream [" + dataStreamName + "] not found");
        }
        return (DataStream) dataStream;
    }

    private static IndexAbstraction validateIndex(ProjectMetadata project, String indexName) {
        IndexAbstraction index = project.getIndicesLookup().get(indexName);
        if (index == null || index.getType() != IndexAbstraction.Type.CONCRETE_INDEX) {
            throw new IllegalArgumentException("index [" + indexName + "] not found");
        }
        return index;
    }

    /**
     * Removes the given data stream and their backing indices from the Project State.
     *
     * @param projectState The project state
     * @param dataStreams  The data streams to remove
     * @param settings     The settings
     * @return The updated Project State
     */
    public static ClusterState deleteDataStreams(ProjectState projectState, Set<DataStream> dataStreams, Settings settings) {
        if (dataStreams.isEmpty()) {
            return projectState.cluster();
        }

        Set<String> dataStreamNames = dataStreams.stream().map(DataStream::getName).collect(Collectors.toSet());
        Set<String> snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams(projectState, dataStreamNames);
        if (snapshottingDataStreams.isEmpty() == false) {
            throw new SnapshotInProgressException(
                "Cannot delete data streams that are being snapshotted: ["
                    + String.join(", ", snapshottingDataStreams)
                    + "]. Try again after snapshot finishes or cancel the currently running snapshot."
            );
        }

        Set<Index> backingIndicesToRemove = new HashSet<>();
        for (DataStream dataStream : dataStreams) {
            assert dataStream != null;
            if (projectState.metadata().dataStreams().get(dataStream.getName()) == null) {
                throw new ResourceNotFoundException("data stream [" + dataStream.getName() + "] not found");
            }
            backingIndicesToRemove.addAll(dataStream.getIndices());
            backingIndicesToRemove.addAll(dataStream.getFailureIndices());
        }

        // first delete the data streams and then the indices:
        // (this to avoid data stream validation from failing when deleting an index that is part of a data stream
        // without updating the data stream)
        // TODO: change order when "delete index api" also updates the data stream the "index to be removed" is a member of
        ClusterState newState = projectState.updatedState(builder -> {
            for (DataStream ds : dataStreams) {
                LOGGER.info("removing data stream [{}]", ds.getName());
                builder.removeDataStream(ds.getName());
            }
        });
        return MetadataDeleteIndexService.deleteIndices(newState.projectState(projectState.projectId()), backingIndicesToRemove, settings);
    }

    /**
     * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
     */
    static class UpdateLifecycleTask extends AckedBatchedClusterStateUpdateTask {

        private final ProjectId projectId;
        private final List<String> dataStreamNames;
        private final DataStreamLifecycle lifecycle;

        UpdateLifecycleTask(
            ProjectId projectId,
            List<String> dataStreamNames,
            @Nullable DataStreamLifecycle lifecycle,
            TimeValue ackTimeout,
            ActionListener<AcknowledgedResponse> listener
        ) {
            super(ackTimeout, listener);
            this.projectId = projectId;
            this.dataStreamNames = dataStreamNames;
            this.lifecycle = lifecycle;
        }

        public ProjectId getProjectId() {
            return projectId;
        }

        public List<String> getDataStreamNames() {
            return dataStreamNames;
        }

        public DataStreamLifecycle getDataLifecycle() {
            return lifecycle;
        }
    }

    /**
     * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
     */
    static class UpdateOptionsTask extends AckedBatchedClusterStateUpdateTask {
        ProjectId projectId;
        private final List<String> dataStreamNames;
        private final DataStreamOptions options;

        UpdateOptionsTask(
            ProjectId projectId,
            List<String> dataStreamNames,
            @Nullable DataStreamOptions options,
            TimeValue ackTimeout,
            ActionListener<AcknowledgedResponse> listener
        ) {
            super(ackTimeout, listener);
            this.projectId = projectId;
            this.dataStreamNames = dataStreamNames;
            this.options = options;
        }

        public ProjectId getProjectId() {
            return projectId;
        }

        public List<String> getDataStreamNames() {
            return dataStreamNames;
        }

        public DataStreamOptions getOptions() {
            return options;
        }
    }

    /**
     * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
     */
    static class SetRolloverOnWriteTask extends AckedBatchedClusterStateUpdateTask {

        private final ProjectId projectId;
        private final String dataStreamName;
        private final boolean rolloverOnWrite;
        private final boolean targetFailureStore;

        SetRolloverOnWriteTask(
            ProjectId projectId,
            String dataStreamName,
            boolean rolloverOnWrite,
            boolean targetFailureStore,
            TimeValue ackTimeout,
            ActionListener<AcknowledgedResponse> listener
        ) {
            super(ackTimeout, listener);
            this.projectId = projectId;
            this.dataStreamName = dataStreamName;
            this.rolloverOnWrite = rolloverOnWrite;
            this.targetFailureStore = targetFailureStore;
        }

        public ProjectId projectId() {
            return projectId;
        }

        public String getDataStreamName() {
            return dataStreamName;
        }

        public boolean rolloverOnWrite() {
            return rolloverOnWrite;
        }

        public boolean targetFailureStore() {
            return targetFailureStore;
        }
    }

    static class UpdateSettingsTask extends AckedBatchedClusterStateUpdateTask {
        final ProjectId projectId;
        private final String dataStreamName;
        private final Settings settingsOverrides;

        UpdateSettingsTask(
            ProjectId projectId,
            String dataStreamName,
            Settings settingsOverrides,
            ClusterService clusterService,
            TimeValue ackTimeout,
            ActionListener<DataStream> listener
        ) {
            super(ackTimeout, listener.safeMap(response -> {
                if (response.isAcknowledged()) {
                    return clusterService.state().metadata().getProject(projectId).dataStreams().get(dataStreamName);
                } else {
                    throw new ElasticsearchException("Updating settings not accepted for unknown reasons");
                }
            }));
            this.projectId = projectId;
            this.dataStreamName = dataStreamName;
            this.settingsOverrides = settingsOverrides;
        }
    }

    static class UpdateMappingsTask extends AckedBatchedClusterStateUpdateTask {
        final ProjectId projectId;
        private final String dataStreamName;
        private final CompressedXContent mappingsOverrides;

        UpdateMappingsTask(
            ProjectId projectId,
            String dataStreamName,
            CompressedXContent mappingsOverrides,
            ClusterService clusterService,
            TimeValue ackTimeout,
            ActionListener<DataStream> listener
        ) {
            super(ackTimeout, listener.safeMap(response -> {
                if (response.isAcknowledged()) {
                    return clusterService.state().projectState(projectId).metadata().dataStreams().get(dataStreamName);
                } else {
                    throw new ElasticsearchException("Updating mappings not accepted for unknown reasons");
                }
            }));
            this.projectId = projectId;
            this.dataStreamName = dataStreamName;
            this.mappingsOverrides = mappingsOverrides;
        }
    }
}
