/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the Elastic License
 * 2.0; you may not use this file except in compliance with the Elastic License
 * 2.0.
 */
package org.elasticsearch.xpack.aggregatemetric.mapper;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateMathParser;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.SimpleMappedFieldType;
import org.elasticsearch.index.mapper.SortedNumericDocValuesSyntheticFieldLoader;
import org.elasticsearch.index.mapper.SourceValueFetcher;
import org.elasticsearch.index.mapper.TextSearchInfo;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType;
import org.elasticsearch.index.mapper.ValueFetcher;
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
import org.elasticsearch.index.mapper.blockloader.ConstantNull;
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.IntsBlockLoader;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.script.ScriptCompiler;
import org.elasticsearch.script.field.DelegateDocValuesField;
import org.elasticsearch.script.field.DocValuesScriptFieldFactory;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.CopyingXContentParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentSubParser;
import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType;
import org.elasticsearch.xpack.aggregatemetric.fielddata.IndexAggregateMetricDoubleFieldData;
import org.elasticsearch.xpack.aggregatemetric.fielddata.LeafAggregateMetricDoubleFieldData;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/** A {@link FieldMapper} for a field containing aggregate metrics such as min/max/value_count etc. */
public class AggregateMetricDoubleFieldMapper extends FieldMapper {

    private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(AggregateMetricDoubleFieldMapper.class);

    public static final String CONTENT_TYPE = "aggregate_metric_double";
    public static final String SUBFIELD_SEPARATOR = ".";

    private static AggregateMetricDoubleFieldMapper toType(FieldMapper in) {
        return (AggregateMetricDoubleFieldMapper) in;
    }

    /**
     * Return the name of a subfield of an aggregate metric field
     *
     * @param fieldName the name of the aggregate metric field
     * @param metric    the metric type the subfield corresponds to
     * @return the name of the subfield
     */
    public static String subfieldName(String fieldName, Metric metric) {
        return fieldName + AggregateMetricDoubleFieldMapper.SUBFIELD_SEPARATOR + metric.name();
    }

    /**
     * Mapping field names
     */
    public static class Names {
        public static final String IGNORE_MALFORMED = "ignore_malformed";
        public static final String METRICS = "metrics";
        public static final String DEFAULT_METRIC = "default_metric";
    }

    /**
     * Enum of aggregate metrics supported by this field mapper
     */
    public enum Metric {
        min,
        max,
        sum,
        value_count
    }

    public static class Defaults {
        public static final EnumSet<Metric> METRICS = EnumSet.noneOf(Metric.class);
    }

    public static final class Builder extends FieldMapper.Builder {

        private final Parameter<Map<String, String>> meta = Parameter.metaParam();

        private final Parameter<Boolean> ignoreMalformed;

        private final Parameter<EnumSet<Metric>> metrics = new Parameter<>(Names.METRICS, false, () -> Defaults.METRICS, (n, c, o) -> {
            @SuppressWarnings("unchecked")
            List<String> metricsList = (List<String>) o;
            EnumSet<Metric> parsedMetrics = EnumSet.noneOf(Metric.class);
            for (String s : metricsList) {
                try {
                    Metric m = Metric.valueOf(s);
                    parsedMetrics.add(m);
                } catch (IllegalArgumentException e) {
                    throw new IllegalArgumentException("Metric [" + s + "] is not supported.", e);
                }
            }
            return parsedMetrics;
        }, m -> toType(m).metrics, XContentBuilder::enumSet, Objects::toString).addValidator(v -> {
            if (v == null || v.isEmpty()) {
                throw new IllegalArgumentException("Property [" + Names.METRICS + "] is required for field [" + leafName() + "].");
            }
        });

        /**
         * Parameter that marks this field as a time series metric defining its time series metric type.
         * For {@link AggregateMetricDoubleFieldMapper} fields gauge, counter and summary metric types are
         * supported.
         */
        private final Parameter<MetricType> timeSeriesMetric;

        /**
         * Set the default metric so that query operations are delegated to it.
         */
        private final Parameter<Metric> defaultMetric = new Parameter<>(Names.DEFAULT_METRIC, false, () -> null, (n, c, o) -> {
            try {
                return Metric.valueOf(o.toString());
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Metric [" + o.toString() + "] is not supported.", e);
            }
        }, m -> toType(m).defaultMetric, XContentBuilder::field, Objects::toString);

        private final IndexSettings indexSettings;

        public Builder(String name, IndexSettings indexSettings) {
            super(name);
            this.ignoreMalformed = Parameter.boolParam(
                Names.IGNORE_MALFORMED,
                true,
                m -> toType(m).ignoreMalformed,
                IGNORE_MALFORMED_SETTING.get(indexSettings.getSettings())
            );

            this.timeSeriesMetric = TimeSeriesParams.metricParam(m -> toType(m).metricType, MetricType.GAUGE);
            this.indexSettings = indexSettings;
        }

        @Override
        protected Parameter<?>[] getParameters() {
            return new Parameter<?>[] { ignoreMalformed, metrics, defaultMetric, meta, timeSeriesMetric };
        }

        public Builder metric(MetricType metric) {
            this.timeSeriesMetric.setValue(metric);
            return this;
        }

        @Override
        public AggregateMetricDoubleFieldMapper build(MapperBuilderContext context) {
            if (multiFieldsBuilder.hasMultiFields()) {
                DEPRECATION_LOGGER.warn(
                    DeprecationCategory.MAPPINGS,
                    CONTENT_TYPE + "_multifields",
                    "Adding multifields to [" + CONTENT_TYPE + "] mappers has no effect and will be forbidden in future"
                );
            }
            if (defaultMetric.isConfigured() == false) {
                // If a single metric is contained, this should be the default
                if (metrics.getValue().size() == 1) {
                    Metric m = metrics.getValue().iterator().next();
                    defaultMetric.setValue(m);
                }

                if (metrics.getValue().contains(defaultMetric.getValue()) == false) {
                    throw new IllegalArgumentException(
                        "Property [" + Names.DEFAULT_METRIC + "] is required for field [" + leafName() + "]."
                    );
                }
            }

            if (metrics.getValue().contains(defaultMetric.getValue()) == false) {
                // The default_metric is not defined in the "metrics" field
                throw new IllegalArgumentException(
                    "Default metric [" + defaultMetric.getValue() + "] is not defined in the metrics of field [" + leafName() + "]."
                );
            }

            EnumMap<Metric, NumberFieldMapper> metricMappers = new EnumMap<>(Metric.class);
            // Instantiate one NumberFieldMapper instance for each metric
            for (Metric m : this.metrics.getValue()) {
                String fieldName = subfieldName(leafName(), m);
                NumberFieldMapper.Builder builder;

                if (m == Metric.value_count) {
                    // value_count metric can only be an integer and not a double
                    builder = new NumberFieldMapper.Builder(
                        fieldName,
                        NumberFieldMapper.NumberType.INTEGER,
                        ScriptCompiler.NONE,
                        indexSettings
                    ).allowMultipleValues(false).ignoreMalformed(false).coerce(false);
                    if (indexSettings.getIndexVersionCreated().onOrAfter(IndexVersions.AGG_METRIC_DOUBLE_REMOVE_POINTS)) {
                        builder.index(false);
                    }
                } else {
                    builder = new NumberFieldMapper.Builder(
                        fieldName,
                        NumberFieldMapper.NumberType.DOUBLE,
                        ScriptCompiler.NONE,
                        indexSettings
                    ).allowMultipleValues(false).ignoreMalformed(false).coerce(true);
                    if (indexSettings.getIndexVersionCreated().onOrAfter(IndexVersions.AGG_METRIC_DOUBLE_REMOVE_POINTS)) {
                        builder.index(false);
                    }
                }
                NumberFieldMapper fieldMapper = builder.build(context);
                metricMappers.put(m, fieldMapper);
            }

            EnumMap<Metric, NumberFieldMapper.NumberFieldType> metricFields = metricMappers.entrySet()
                .stream()
                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().fieldType(), (l, r) -> {
                    throw new IllegalArgumentException("Duplicate keys " + l + "and " + r + ".");
                }, () -> new EnumMap<>(Metric.class)));

            AggregateMetricDoubleFieldType metricFieldType = new AggregateMetricDoubleFieldType(
                context.buildFullName(leafName()),
                timeSeriesMetric.getValue(),
                defaultMetric.getValue(),
                metricFields,
                meta.getValue()
            );

            return new AggregateMetricDoubleFieldMapper(leafName(), metricFieldType, metricMappers, builderParams(this, context), this);
        }
    }

    public static final FieldMapper.TypeParser PARSER = new TypeParser(
        (n, c) -> new Builder(n, c.getIndexSettings()),
        notInMultiFields(CONTENT_TYPE)
    );

    public static final class AggregateMetricDoubleFieldType extends SimpleMappedFieldType {

        private final EnumMap<Metric, NumberFieldMapper.NumberFieldType> metricFields;
        private final Metric defaultMetric;
        private final MetricType metricType;

        public AggregateMetricDoubleFieldType(
            String name,
            MetricType metricType,
            Metric defaultMetric,
            EnumMap<Metric, NumberFieldMapper.NumberFieldType> metricFields,
            Map<String, String> meta
        ) {
            super(name, metricFields.get(defaultMetric).indexType(), false, meta);
            this.metricType = metricType;
            this.defaultMetric = defaultMetric;
            this.metricFields = metricFields;
        }

        /**
         * Return a delegate field type for a given metric sub-field
         * @return a field type
         */
        private NumberFieldMapper.NumberFieldType delegateFieldType(Metric metric) {
            return metricFields.get(metric);
        }

        /**
         * Return a delegate field type for the default metric sub-field
         * @return a field type
         */
        private NumberFieldMapper.NumberFieldType delegateFieldType() {
            return delegateFieldType(defaultMetric);
        }

        @Override
        public String typeName() {
            return CONTENT_TYPE;
        }

        @Override
        public TextSearchInfo getTextSearchInfo() {
            return TextSearchInfo.SIMPLE_MATCH_WITHOUT_TERMS;
        }

        public Map<Metric, NumberFieldMapper.NumberFieldType> getMetricFields() {
            return Collections.unmodifiableMap(metricFields);
        }

        Metric getDefaultMetric() {
            return defaultMetric;
        }

        @Override
        public boolean mayExistInIndex(SearchExecutionContext context) {
            return delegateFieldType().mayExistInIndex(context);    // TODO how does searching actually work here?
        }

        @Override
        public Query existsQuery(SearchExecutionContext context) {
            return delegateFieldType().existsQuery(context);
        }

        @Override
        public Query termQuery(Object value, SearchExecutionContext context) {
            if (value == null) {
                throw new IllegalArgumentException("Cannot search for null.");
            }
            return delegateFieldType().termQuery(value, context);
        }

        @Override
        public Query termsQuery(Collection<?> values, SearchExecutionContext context) {
            return delegateFieldType().termsQuery(values, context);
        }

        @Override
        public Query rangeQuery(
            Object lowerTerm,
            Object upperTerm,
            boolean includeLower,
            boolean includeUpper,
            SearchExecutionContext context
        ) {
            return delegateFieldType().rangeQuery(lowerTerm, upperTerm, includeLower, includeUpper, context);
        }

        @Override
        public Object valueForDisplay(Object value) {
            return delegateFieldType().valueForDisplay(value);
        }

        @Override
        public DocValueFormat docValueFormat(String format, ZoneId timeZone) {
            return delegateFieldType().docValueFormat(format, timeZone);
        }

        @Override
        public Relation isFieldWithinQuery(
            IndexReader reader,
            Object from,
            Object to,
            boolean includeLower,
            boolean includeUpper,
            ZoneId timeZone,
            DateMathParser dateMathParser,
            QueryRewriteContext context
        ) throws IOException {
            return delegateFieldType().isFieldWithinQuery(reader, from, to, includeLower, includeUpper, timeZone, dateMathParser, context);
        }

        @Override
        public boolean isAggregatable() {
            return true;
        }

        @Override
        public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) {
            return (cache, breakerService) -> new IndexAggregateMetricDoubleFieldData(
                name(),
                AggregateMetricsValuesSourceType.AGGREGATE_METRIC
            ) {
                @Override
                public LeafAggregateMetricDoubleFieldData load(LeafReaderContext context) {
                    return new LeafAggregateMetricDoubleFieldData() {
                        @Override
                        public SortedNumericDoubleValues getAggregateMetricValues(final Metric metric) {
                            try {
                                final SortedNumericDocValues values = DocValues.getSortedNumeric(
                                    context.reader(),
                                    subfieldName(getFieldName(), metric)
                                );

                                return new SortedNumericDoubleValues() {
                                    @Override
                                    public int docValueCount() {
                                        return values.docValueCount();
                                    }

                                    @Override
                                    public boolean advanceExact(int doc) throws IOException {
                                        return values.advanceExact(doc);
                                    }

                                    @Override
                                    public double nextValue() throws IOException {
                                        long v = values.nextValue();
                                        if (metric == Metric.value_count) {
                                            // Only value_count metrics are encoded as integers
                                            return v;
                                        } else {
                                            // All other metrics are encoded as doubles
                                            return NumericUtils.sortableLongToDouble(v);
                                        }
                                    }
                                };
                            } catch (IOException e) {
                                throw new IllegalStateException("Cannot load doc values", e);
                            }
                        }

                        @Override
                        public DocValuesScriptFieldFactory getScriptFieldFactory(String name) {
                            // getAggregateMetricValues returns all metric as doubles, including `value_count`
                            return new DelegateDocValuesField(
                                new ScriptDocValues.Doubles(new DoublesSupplier(getAggregateMetricValues(defaultMetric))),
                                name
                            );
                        }

                        @Override
                        public SortedBinaryDocValues getBytesValues() {
                            throw new UnsupportedOperationException(
                                "String representation of doc values " + "for [" + CONTENT_TYPE + "] fields is not supported"
                            );
                        }

                        @Override
                        public long ramBytesUsed() {
                            return 0; // Unknown
                        }

                    };
                }

                @Override
                public LeafAggregateMetricDoubleFieldData loadDirect(LeafReaderContext context) {
                    return load(context);
                }

                @Override
                public SortField sortField(
                    Object missingValue,
                    MultiValueMode sortMode,
                    XFieldComparatorSource.Nested nested,
                    boolean reverse
                ) {
                    return new SortedNumericSortField(delegateFieldType().name(), SortField.Type.DOUBLE, reverse);
                }

                @Override
                public BucketedSort newBucketedSort(
                    BigArrays bigArrays,
                    Object missingValue,
                    MultiValueMode sortMode,
                    XFieldComparatorSource.Nested nested,
                    SortOrder sortOrder,
                    DocValueFormat format,
                    int bucketSize,
                    BucketedSort.ExtraData extra
                ) {
                    throw new IllegalArgumentException("Can't sort on the [" + CONTENT_TYPE + "] field");
                }
            };
        }

        @Override
        public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
            return SourceValueFetcher.identity(name(), context, format);
        }

        @Override
        public BlockLoader blockLoader(BlockLoaderContext blContext) {
            BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig();
            if (cfg != null) {
                var function = cfg.function();
                Metric metric = switch (function) {
                    case AMD_COUNT -> Metric.value_count;
                    case AMD_MAX -> Metric.max;
                    case AMD_MIN -> Metric.min;
                    case AMD_SUM -> Metric.sum;
                    case AMD_DEFAULT -> defaultMetric;
                    default -> null;
                };
                if (metric == null) {
                    return new AggregateMetricDoubleBlockLoader(metricFields);
                }
                return getIndividualBlockLoader(metric);
            }
            return new AggregateMetricDoubleBlockLoader(metricFields);
        }

        private BlockLoader getIndividualBlockLoader(Metric metric) {
            if (metricFields.containsKey(metric) == false) {
                return ConstantNull.INSTANCE;
            }
            if (metric == Metric.value_count) {
                return new IntsBlockLoader(metricFields.get(Metric.value_count).name());
            }
            return new DoublesBlockLoader(metricFields.get(metric).name(), NumericUtils::sortableLongToDouble);
        }

        @Override
        public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) {
            return switch (config.function()) {
                case AMD_MIN, AMD_MAX, AMD_SUM, AMD_COUNT, AMD_DEFAULT -> true;
                default -> false;
            };
        }

        /**
         * If field is a time series metric field, returns its metric type
         * @return the metric type or null
         */
        public MetricType getMetricType() {
            return metricType;
        }
    }

    private final EnumMap<Metric, NumberFieldMapper> metricFieldMappers;

    private final boolean ignoreMalformed;

    /** A set of metrics supported */
    private final EnumSet<Metric> metrics;

    /** The default metric to be when querying this field type */
    protected Metric defaultMetric;

    /** The metric type (gauge, counter, summary) if  field is a time series metric */
    private final TimeSeriesParams.MetricType metricType;

    private final IndexSettings indexSettings;

    private AggregateMetricDoubleFieldMapper(
        String simpleName,
        MappedFieldType mappedFieldType,
        EnumMap<Metric, NumberFieldMapper> metricFieldMappers,
        BuilderParams builderParams,
        Builder builder
    ) {
        super(simpleName, mappedFieldType, builderParams);
        this.ignoreMalformed = builder.ignoreMalformed.getValue();
        this.metrics = builder.metrics.getValue();
        this.defaultMetric = builder.defaultMetric.getValue();
        this.metricFieldMappers = metricFieldMappers;
        this.metricType = builder.timeSeriesMetric.getValue();
        this.indexSettings = builder.indexSettings;
    }

    @Override
    public boolean ignoreMalformed() {
        return ignoreMalformed;
    }

    Metric defaultMetric() {
        return defaultMetric;
    }

    @Override
    public AggregateMetricDoubleFieldType fieldType() {
        return (AggregateMetricDoubleFieldType) super.fieldType();
    }

    @Override
    protected String contentType() {
        return CONTENT_TYPE;
    }

    @Override
    public Iterator<Mapper> iterator() {
        return Collections.emptyIterator();
    }

    @Override
    protected boolean supportsParsingObject() {
        return true;
    }

    @Override
    protected void parseCreateField(DocumentParserContext context) throws IOException {
        context.path().add(leafName());
        XContentParser.Token token;
        XContentSubParser subParser = null;
        EnumMap<Metric, Number> metricsParsed = new EnumMap<>(Metric.class);
        // Preserves the content of the field in order to be able to construct synthetic source
        // if field value is malformed.
        XContentBuilder malformedDataForSyntheticSource = null;

        try {
            token = context.parser().currentToken();
            if (token == XContentParser.Token.VALUE_NULL) {
                context.path().remove();
                return;
            }
            ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser());
            if (context.mappingLookup().isSourceSynthetic() && ignoreMalformed) {
                var copyingParser = new CopyingXContentParser(context.parser());
                malformedDataForSyntheticSource = copyingParser.getBuilder();
                subParser = new XContentSubParser(copyingParser);
            } else {
                subParser = new XContentSubParser(context.parser());
            }
            token = subParser.nextToken();
            while (token != XContentParser.Token.END_OBJECT) {
                // should be an object sub-field with name a metric name
                ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser);
                String fieldName = subParser.currentName();
                Metric metric = Metric.valueOf(fieldName);

                if (metrics.contains(metric) == false) {
                    throw new IllegalArgumentException(
                        "Aggregate metric [" + metric + "] does not exist in the mapping of field [" + mappedFieldType.name() + "]"
                    );
                }

                token = subParser.nextToken();
                // Make sure that the value is a number. Probably this will change when
                // new aggregate metric types are added (histogram, cardinality etc)
                ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser);
                NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(metric);
                // Delegate parsing the field to a numeric field mapper
                try {
                    Number metricValue = delegateFieldMapper.value(context.parser());
                    metricsParsed.put(metric, metricValue);
                } catch (IllegalArgumentException e) {
                    throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e);
                }
                token = subParser.nextToken();
            }

            // check max value must bigger then min value
            Number min = metricsParsed.get(Metric.min);
            Number max = metricsParsed.get(Metric.max);
            if (max != null && min != null && max.doubleValue() < min.doubleValue()) {
                throw new IllegalArgumentException(
                    "Aggregate metric field [" + mappedFieldType.name() + "] max value cannot be smaller than min value"
                );
            }
            Number valueCount = metricsParsed.get(Metric.value_count);
            if (valueCount != null && valueCount.intValue() < 0) {
                throw new IllegalArgumentException(
                    "Aggregate metric [" + Metric.value_count + "] of field [" + mappedFieldType.name() + "] cannot be a negative number"
                );
            }

            // Check if all metrics have been parsed.
            if (metricsParsed.size() != metrics.size()) {
                throw new IllegalArgumentException(
                    "Aggregate metric field [" + mappedFieldType.name() + "] must contain all metrics " + metrics
                );
            }
            // Check that there aren't any duplicates already parsed
            for (Metric m : metricsParsed.keySet()) {
                NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(m);
                if (context.doc().getByKey(delegateFieldMapper.fieldType().name()) != null) {
                    throw new IllegalArgumentException(
                        "Field ["
                            + fullPath()
                            + "] of type ["
                            + typeName()
                            + "] does not support indexing multiple values for the same field in the same document"
                    );
                }
            }
        } catch (Exception e) {
            if (ignoreMalformed) {
                if (subParser != null) {
                    // close the subParser, so we advance to the end of the object
                    subParser.close();
                } else {
                    if (context.mappingLookup().isSourceSynthetic()) {
                        // There is a malformed value, but it is not an object (since subParser is null).
                        // So we just need to copy this single value.
                        malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent())
                            .copyCurrentStructure(context.parser());
                    }
                }

                if (malformedDataForSyntheticSource != null) {
                    context.doc().add(IgnoreMalformedStoredValues.storedField(fullPath(), malformedDataForSyntheticSource));
                }

                context.addIgnoredField(fullPath());
                context.path().remove();
                return;
            }
            // Rethrow exception as is. It is going to be caught and nested in a MapperParsingException
            // by its FieldMapper#parse()
            throw e;
        }

        for (Map.Entry<Metric, Number> parsed : metricsParsed.entrySet()) {
            NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(parsed.getKey());
            delegateFieldMapper.indexValue(context, parsed.getValue());
        }
        context.path().remove();
    }

    @Override
    public FieldMapper.Builder getMergeBuilder() {
        return new Builder(leafName(), indexSettings).metric(metricType).init(this);
    }

    @Override
    protected SyntheticSourceSupport syntheticSourceSupport() {
        return new SyntheticSourceSupport.Native(
            () -> new CompositeSyntheticFieldLoader(
                leafName(),
                fullPath(),
                new AggregateMetricSyntheticFieldLoader(fullPath(), metrics),
                new CompositeSyntheticFieldLoader.MalformedValuesLayer(fullPath())
            )
        );
    }

    public static class AggregateMetricSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer {
        private final String name;
        private final EnumSet<Metric> metrics;
        private final Map<Metric, SortedNumericDocValues> metricDocValues = new EnumMap<>(Metric.class);
        private final Set<Metric> metricHasValue = EnumSet.noneOf(Metric.class);

        protected AggregateMetricSyntheticFieldLoader(String name, EnumSet<Metric> metrics) {
            this.name = name;
            this.metrics = metrics;
        }

        @Override
        public String fieldName() {
            return name;
        }

        @Override
        public long valueCount() {
            return hasValue() ? 1 : 0;
        }

        @Override
        public DocValuesLoader docValuesLoader(LeafReader reader, int[] docIdsInLeaf) throws IOException {
            metricDocValues.clear();
            for (Metric m : metrics) {
                String fieldName = subfieldName(name, m);
                SortedNumericDocValues dv = SortedNumericDocValuesSyntheticFieldLoader.docValuesOrNull(reader, fieldName);
                if (dv != null) {
                    metricDocValues.put(m, dv);
                }
            }

            if (metricDocValues.isEmpty()) {
                return null;
            }

            return new AggregateDocValuesLoader();
        }

        @Override
        public boolean hasValue() {
            return metricHasValue.isEmpty() == false;
        }

        @Override
        public void write(XContentBuilder b) throws IOException {
            if (metricHasValue.isEmpty()) {
                return;
            }
            b.startObject();
            for (Map.Entry<Metric, SortedNumericDocValues> entry : metricDocValues.entrySet()) {
                if (metricHasValue.contains(entry.getKey())) {
                    String metricName = entry.getKey().name();
                    long value = entry.getValue().nextValue();
                    if (entry.getKey() == Metric.value_count) {
                        b.field(metricName, value);
                    } else {
                        b.field(metricName, NumericUtils.sortableLongToDouble(value));
                    }
                }
            }
            b.endObject();
        }

        private class AggregateDocValuesLoader implements DocValuesLoader {
            @Override
            public boolean advanceToDoc(int docId) throws IOException {
                // It is required that all defined metrics must exist. In this case
                // it is enough to check for the first docValue. However, in the future
                // we may relax the requirement of all metrics existing. In this case
                // we should check the doc value for each metric separately
                metricHasValue.clear();
                for (Map.Entry<Metric, SortedNumericDocValues> e : metricDocValues.entrySet()) {
                    if (e.getValue().advanceExact(docId)) {
                        metricHasValue.add(e.getKey());
                    }
                }

                return metricHasValue.isEmpty() == false;
            }
        }
    }

}
