/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */

package org.elasticsearch.index.get;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.IgnoredFieldMapper;
import org.elasticsearch.index.mapper.InferenceMetadataFieldsMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MultiEngineGet;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchFieldsContext;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceFilter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.index.IndexSettings.INDEX_MAPPING_EXCLUDE_SOURCE_VECTORS_SETTING;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public final class ShardGetService extends AbstractIndexShardComponent {
    private final MapperService mapperService;
    private final MeanMetric existsMetric = new MeanMetric();
    private final MeanMetric missingMetric = new MeanMetric();
    private final CounterMetric currentMetric = new CounterMetric();
    private final IndexShard indexShard;
    private final MapperMetrics mapperMetrics;

    public ShardGetService(IndexSettings indexSettings, IndexShard indexShard, MapperService mapperService, MapperMetrics mapperMetrics) {
        super(indexShard.shardId(), indexSettings);
        this.mapperService = mapperService;
        this.indexShard = indexShard;
        this.mapperMetrics = mapperMetrics;
    }

    public GetStats stats() {
        return new GetStats(
            existsMetric.count(),
            TimeUnit.NANOSECONDS.toMillis(existsMetric.sum()),
            missingMetric.count(),
            TimeUnit.NANOSECONDS.toMillis(missingMetric.sum()),
            currentMetric.count()
        );
    }

    @Deprecated
    public GetResult get(
        String id,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        FetchSourceContext fetchSourceContext,
        boolean forceSyntheticSource
    ) throws IOException {
        return get(
            id,
            null,
            gFields,
            realtime,
            version,
            versionType,
            fetchSourceContext,
            forceSyntheticSource,
            SplitShardCountSummary.UNSET
        );
    }

    public GetResult get(
        String id,
        String routing,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        FetchSourceContext fetchSourceContext,
        boolean forceSyntheticSource,
        SplitShardCountSummary splitShardCountSummary
    ) throws IOException {
        return doGet(
            id,
            routing,
            gFields,
            realtime,
            version,
            versionType,
            UNASSIGNED_SEQ_NO,
            UNASSIGNED_PRIMARY_TERM,
            fetchSourceContext,
            forceSyntheticSource,
            splitShardCountSummary,
            indexShard::get
        );
    }

    public GetResult mget(
        String id,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        FetchSourceContext fetchSourceContext,
        boolean forceSyntheticSource,
        MultiEngineGet mget
    ) throws IOException {
        return doGet(
            id,
            null,
            gFields,
            realtime,
            version,
            versionType,
            UNASSIGNED_SEQ_NO,
            UNASSIGNED_PRIMARY_TERM,
            fetchSourceContext,
            forceSyntheticSource,
            SplitShardCountSummary.UNSET,
            mget::get
        );
    }

    private GetResult doGet(
        String id,
        String routing,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        long ifSeqNo,
        long ifPrimaryTerm,
        FetchSourceContext fetchSourceContext,
        boolean forceSyntheticSource,
        SplitShardCountSummary splitShardCountSummary,
        Function<Engine.Get, Engine.GetResult> engineGetOperator
    ) throws IOException {
        currentMetric.inc();
        final long now = System.nanoTime();
        try {
            var engineGet = new Engine.Get(realtime, realtime, id).version(version)
                .versionType(versionType)
                .setIfSeqNo(ifSeqNo)
                .setIfPrimaryTerm(ifPrimaryTerm);

            final GetResult getResult;
            try (Engine.GetResult get = engineGetOperator.apply(engineGet)) {
                if (get == null) {
                    getResult = null;
                } else if (get.exists() == false) {
                    getResult = new GetResult(
                        shardId.getIndexName(),
                        id,
                        UNASSIGNED_SEQ_NO,
                        UNASSIGNED_PRIMARY_TERM,
                        -1,
                        false,
                        null,
                        null,
                        null
                    );
                } else {
                    // break between having loaded it from translog (so we only have _source), and having a document to load
                    getResult = innerGetFetch(
                        id,
                        gFields,
                        normalizeFetchSourceContent(fetchSourceContext, gFields),
                        get,
                        forceSyntheticSource
                    );
                }
            }
            if (getResult != null && getResult.isExists()) {
                existsMetric.inc(System.nanoTime() - now);
            } else {
                missingMetric.inc(System.nanoTime() - now);
            }
            if (getResult == null || getResult.isExists() == false) {
                // during resharding, a coordinating node may route a get request to a shard that is the source of a split
                // before the target shard has taken over that document, but by the time the request is processed on the
                // source shard, handoff has occurred. If the get succeeds, this is fine - we block refreshes during this
                // period so although indexing may have occured on the target shard, the source shard's copy is still valid
                // because refresh hasn't happened. However, if the source shard doesn't have the document in this case, it
                // may be that it is because the shard has deleted unowned documents after the split. Normally this shouldn't
                // occur because we will delay deleting those documents for some grace period, but if it does happen we should
                // fail the request rather than possibly incorrectly reporting that the document doesn't exist, when it may
                // in fact be present on a different shard.
                // We can defer this check until we know whether the response is missing, so that the common case doesn't
                // have to resolve the current split shard summary. This way also means that a race between the get and handoff state
                // can only cause us to throw an exception unnecessarily, rather than incorrectly returning a missing document.
                final var indexMetadata = mapperService.getIndexSettings().getIndexMetadata();
                final var currentSummary = SplitShardCountSummary.forSearch(indexMetadata, shardId().getId());
                if (splitShardCountSummary.equals(SplitShardCountSummary.UNSET)) {
                    // TODO, this should only be possible temporarily, until we've ensured that all callers provide a valid summary.
                    return getResult;
                }
                if (splitShardCountSummary.compareTo(currentSummary) >= 0) {
                    // coordinator is current, so response is valid
                    return getResult;
                }
                // Otherwise, recompute the route of the requested document based on current metadata and fail the request if it
                // doesn't map to this shard anymore, since delete unowned may have removed it by this point. This is conservative:
                // the document may genuinely not have existed, but unless we can be certain that delete-unowned hasn't run yet
                // (which is difficult, because the index shard may see the target move to done before the search shard) it is safer
                // to fail the request.
                final var indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
                final var docShard = indexRouting.getShard(id, routing);
                if (docShard != shardId().getId()) {
                    // XXX we may want a more specific exception type here
                    throw new ElasticsearchStatusException("stale get request for document [" + id + "]", RestStatus.SERVICE_UNAVAILABLE);
                } else {
                    return getResult;
                }
            }
            return getResult;
        } finally {
            currentMetric.dec();
        }
    }

    public GetResult getFromTranslog(
        String id,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        FetchSourceContext fetchSourceContext,
        boolean forceSyntheticSource
    ) throws IOException {
        return doGet(
            id,
            null,
            gFields,
            realtime,
            version,
            versionType,
            UNASSIGNED_SEQ_NO,
            UNASSIGNED_PRIMARY_TERM,
            fetchSourceContext,
            forceSyntheticSource,
            SplitShardCountSummary.UNSET,
            indexShard::getFromTranslog
        );
    }

    public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) throws IOException {
        return doGet(
            id,
            null,
            new String[] { RoutingFieldMapper.NAME },
            true,
            Versions.MATCH_ANY,
            VersionType.INTERNAL,
            ifSeqNo,
            ifPrimaryTerm,
            fetchSourceContext,
            false,
            SplitShardCountSummary.UNSET,
            indexShard::get
        );
    }

    /**
     * Returns {@link GetResult} based on the specified {@link org.elasticsearch.index.engine.Engine.GetResult} argument.
     * This method basically loads specified fields for the associated document in the engineGetResult.
     * This method load the fields from the Lucene index and not from transaction log and therefore isn't realtime.
     * <p>
     * Note: Call <b>must</b> release engine searcher associated with engineGetResult!
     */
    public GetResult get(Engine.GetResult engineGetResult, String id, String[] fields, FetchSourceContext fetchSourceContext)
        throws IOException {
        if (engineGetResult.exists() == false) {
            return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
        }

        currentMetric.inc();
        try {
            long now = System.nanoTime();
            fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, fields);
            GetResult getResult = innerGetFetch(id, fields, fetchSourceContext, engineGetResult, false);
            if (getResult.isExists()) {
                existsMetric.inc(System.nanoTime() - now);
            } else {
                missingMetric.inc(System.nanoTime() - now); // This shouldn't happen...
            }
            return getResult;
        } finally {
            currentMetric.dec();
        }
    }

    /**
     * decides what needs to be done based on the request input and always returns a valid non-null FetchSourceContext
     */
    private static FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceContext context, @Nullable String[] gFields) {
        if (context != null) {
            return context;
        }
        if (gFields == null) {
            return FetchSourceContext.FETCH_SOURCE;
        }
        for (String field : gFields) {
            if (SourceFieldMapper.NAME.equals(field)) {
                return FetchSourceContext.FETCH_SOURCE;
            }
        }
        return FetchSourceContext.DO_NOT_FETCH_SOURCE;
    }

    private GetResult innerGetFetch(
        String id,
        String[] storedFields,
        FetchSourceContext fetchSourceContext,
        Engine.GetResult get,
        boolean forceSyntheticSource
    ) throws IOException {
        assert get.exists() : "method should only be called if document could be retrieved";
        // check first if stored fields to be loaded don't contain an object field
        MappingLookup mappingLookup = mapperService.mappingLookup();
        final Set<String> storedFieldSet = new HashSet<>();
        if (storedFields != null) {
            for (String field : storedFields) {
                Mapper fieldMapper = mappingLookup.getMapper(field);
                if (fieldMapper == null) {
                    if (mappingLookup.objectMappers().get(field) != null) {
                        // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
                        throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
                    }
                }
                storedFieldSet.add(field);
            }
        }

        Map<String, DocumentField> documentFields = null;
        Map<String, DocumentField> metadataFields = null;
        DocIdAndVersion docIdAndVersion = get.docIdAndVersion();

        var res = maybeExcludeVectorFields(mappingLookup, indexSettings, fetchSourceContext, null);
        if (res.v1() != fetchSourceContext) {
            fetchSourceContext = res.v1();
        }

        if (mappingLookup.inferenceFields().isEmpty() == false && shouldExcludeInferenceFieldsFromSource(fetchSourceContext) == false) {
            storedFieldSet.add(InferenceMetadataFieldsMapper.NAME);
        }

        var sourceFilter = res.v2();
        SourceLoader loader = forceSyntheticSource
            ? new SourceLoader.Synthetic(
                sourceFilter,
                () -> mappingLookup.getMapping().syntheticFieldLoader(sourceFilter),
                mapperMetrics.sourceFieldMetrics(),
                mappingLookup.getMapping().ignoredSourceFormat()
            )
            : mappingLookup.newSourceLoader(sourceFilter, mapperMetrics.sourceFieldMetrics());
        StoredFieldLoader storedFieldLoader = buildStoredFieldLoader(storedFieldSet, fetchSourceContext, loader);
        LeafStoredFieldLoader leafStoredFieldLoader = storedFieldLoader.getLoader(docIdAndVersion.reader.getContext(), null);
        try {
            leafStoredFieldLoader.advanceTo(docIdAndVersion.docId);
        } catch (IOException e) {
            throw new ElasticsearchException("Failed to get id [" + id + "]", e);
        }

        final boolean supportDocValuesForIgnoredMetaField = indexSettings.getIndexVersionCreated()
            .onOrAfter(IndexVersions.DOC_VALUES_FOR_IGNORED_META_FIELD);

        // put stored fields into result objects
        if (leafStoredFieldLoader.storedFields().isEmpty() == false) {
            Set<String> needed = new HashSet<>();
            if (storedFields != null) {
                Collections.addAll(needed, storedFields);
            }
            needed.add(RoutingFieldMapper.NAME); // we always return _routing if we see it, even if you don't ask for it.....
            documentFields = new HashMap<>();
            metadataFields = new HashMap<>();
            for (Map.Entry<String, List<Object>> entry : leafStoredFieldLoader.storedFields().entrySet()) {
                if (false == needed.contains(entry.getKey())) {
                    continue;
                }
                if (IgnoredFieldMapper.NAME.equals(entry.getKey()) && supportDocValuesForIgnoredMetaField) {
                    continue;
                }
                MappedFieldType ft = mapperService.fieldType(entry.getKey());
                if (ft == null) {
                    continue;   // user asked for a non-existent field, ignore it
                }
                List<Object> values = entry.getValue().stream().map(ft::valueForDisplay).toList();
                if (mapperService.isMetadataField(entry.getKey())) {
                    metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), values));
                } else {
                    documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), values));
                }
            }
        }

        // NOTE: when _ignored is requested via `stored_fields` we need to load it from doc values instead of loading it from stored fields.
        // The _ignored field used to be stored, but as a result of supporting aggregations on it, it moved from using a stored field to
        // using doc values.
        if (supportDocValuesForIgnoredMetaField && storedFields != null && Arrays.asList(storedFields).contains(IgnoredFieldMapper.NAME)) {
            final DocumentField ignoredDocumentField = loadIgnoredMetadataField(docIdAndVersion);
            if (ignoredDocumentField != null) {
                if (metadataFields == null) {
                    metadataFields = new HashMap<>();
                }
                metadataFields.put(IgnoredFieldMapper.NAME, ignoredDocumentField);
            }
        }

        BytesReference sourceBytes = null;
        if (mapperService.mappingLookup().isSourceEnabled() && fetchSourceContext.fetchSource()) {
            Source source = loader.leaf(docIdAndVersion.reader, new int[] { docIdAndVersion.docId })
                .source(leafStoredFieldLoader, docIdAndVersion.docId);

            SourceFilter filter = fetchSourceContext.filter();
            if (filter != null) {
                source = source.filter(filter);
            }

            if (storedFieldSet.contains(InferenceMetadataFieldsMapper.NAME)) {
                /**
                 * Adds the {@link InferenceMetadataFieldsMapper#NAME} field from the document fields
                 * to the original _source if it has been requested.
                 */
                source = addInferenceMetadataFields(mapperService, docIdAndVersion.reader.getContext(), docIdAndVersion.docId, source);
            }
            sourceBytes = source.internalSourceRef();
        }

        return new GetResult(
            shardId.getIndexName(),
            id,
            get.docIdAndVersion().seqNo,
            get.docIdAndVersion().primaryTerm,
            get.version(),
            get.exists(),
            sourceBytes,
            documentFields,
            metadataFields
        );
    }

    /**
     * Determines whether vector fields should be excluded from the source based on the {@link FetchSourceContext}.
     * Returns {@code true} if vector fields are explicitly marked to be excluded and {@code false} otherwise.
     */
    public static boolean shouldExcludeVectorsFromSource(IndexSettings indexSettings, FetchSourceContext fetchSourceContext) {
        var explicit = shouldExcludeVectorsFromSourceExplicit(fetchSourceContext);
        return explicit != null ? explicit : INDEX_MAPPING_EXCLUDE_SOURCE_VECTORS_SETTING.get(indexSettings.getSettings());
    }

    private static Boolean shouldExcludeVectorsFromSourceExplicit(FetchSourceContext fetchSourceContext) {
        return fetchSourceContext != null ? fetchSourceContext.excludeVectors() : null;
    }

    public static boolean shouldExcludeInferenceFieldsFromSource(FetchSourceContext fetchSourceContext) {
        if (fetchSourceContext != null) {
            if (fetchSourceContext.fetchSource() == false) {
                // Source is disabled
                return true;
            }

            var filter = fetchSourceContext.filter();
            if (filter != null) {
                if (filter.isPathFiltered(InferenceMetadataFieldsMapper.NAME, true)) {
                    return true;
                } else if (filter.isExplicitlyIncluded(InferenceMetadataFieldsMapper.NAME)) {
                    return false;
                }
            }

            Boolean excludeInferenceFieldsExplicit = shouldExcludeInferenceFieldsFromSourceExplicit(fetchSourceContext);
            if (excludeInferenceFieldsExplicit != null) {
                return excludeInferenceFieldsExplicit;
            }
        }

        // We always default to excluding the inference metadata field, unless the fetch source context says otherwise
        return true;
    }

    private static Boolean shouldExcludeInferenceFieldsFromSourceExplicit(FetchSourceContext fetchSourceContext) {
        return fetchSourceContext != null ? fetchSourceContext.excludeInferenceFields() : null;
    }

    /**
     * Returns a {@link SourceFilter} that excludes vector fields not associated with semantic text fields,
     * unless vectors are explicitly requested to be included in the source.
     * Returns {@code null} when vectors should not be filtered out.
     */
    public static Tuple<FetchSourceContext, SourceFilter> maybeExcludeVectorFields(
        MappingLookup mappingLookup,
        IndexSettings indexSettings,
        FetchSourceContext fetchSourceContext,
        FetchFieldsContext fetchFieldsContext
    ) {
        if (shouldExcludeVectorsFromSource(indexSettings, fetchSourceContext) == false) {
            return Tuple.tuple(fetchSourceContext, null);
        }
        var fetchFieldsAut = fetchFieldsContext != null && fetchFieldsContext.fields().size() > 0
            ? new CharacterRunAutomaton(
                Regex.simpleMatchToAutomaton(fetchFieldsContext.fields().stream().map(f -> f.field).toArray(String[]::new))
            )
            : null;
        var inferenceFieldsAut = mappingLookup.inferenceFields().size() > 0
            ? new CharacterRunAutomaton(
                Regex.simpleMatchToAutomaton(mappingLookup.inferenceFields().keySet().stream().map(f -> f + "*").toArray(String[]::new))
            )
            : null;

        SourceFilter filter = fetchSourceContext != null ? fetchSourceContext.filter() : null;

        List<String> lateExcludes = new ArrayList<>();
        var excludes = mappingLookup.getFullNameToFieldType().values().stream().filter(MappedFieldType::isVectorEmbedding).filter(f -> {
            // Keep the vector fields that are explicitly included and not explicitly excluded
            if (filter != null && filter.isExplicitlyIncluded(f.name())) {
                return filter.isPathFiltered(f.name(), false);
            }
            // Exclude the field specified by the `fields` option
            if (fetchFieldsAut != null && fetchFieldsAut.run(f.name())) {
                lateExcludes.add(f.name());
                return false;
            }
            // Exclude vectors from semantic text fields, as they are processed separately
            return inferenceFieldsAut == null || inferenceFieldsAut.run(f.name()) == false;
        }).map(MappedFieldType::name).toList();

        var sourceFilter = excludes.isEmpty() ? null : new SourceFilter(new String[] {}, excludes.toArray(String[]::new));
        if (lateExcludes.size() > 0) {
            /**
             * Adds the vector field specified by the `fields` option to the excludes list of the fetch source context.
             * This ensures that vector fields are available to sub-fetch phases, but excluded during the {@link FetchSourcePhase}.
             */
            if (fetchSourceContext != null && fetchSourceContext.excludes() != null) {
                lateExcludes.addAll(Arrays.asList(fetchSourceContext.excludes()));
            }
            var newFetchSourceContext = fetchSourceContext == null
                ? FetchSourceContext.of(true, false, null, lateExcludes.toArray(String[]::new))
                : FetchSourceContext.of(
                    fetchSourceContext.fetchSource(),
                    fetchSourceContext.excludeVectors(),
                    fetchSourceContext.excludeInferenceFields(),
                    fetchSourceContext.includes(),
                    lateExcludes.toArray(String[]::new)
                );
            return Tuple.tuple(newFetchSourceContext, sourceFilter);
        }
        return Tuple.tuple(fetchSourceContext, sourceFilter);
    }

    private static DocumentField loadIgnoredMetadataField(final DocIdAndVersion docIdAndVersion) throws IOException {
        final SortedSetDocValues ignoredDocValues = docIdAndVersion.reader.getContext()
            .reader()
            .getSortedSetDocValues(IgnoredFieldMapper.NAME);
        if (ignoredDocValues == null
            || ignoredDocValues.advanceExact(docIdAndVersion.docId) == false
            || ignoredDocValues.docValueCount() <= 0) {
            return null;
        }
        final List<Object> ignoredValues = new ArrayList<>(ignoredDocValues.docValueCount());
        for (int i = 0; i < ignoredDocValues.docValueCount(); i++) {
            ignoredValues.add(ignoredDocValues.lookupOrd(ignoredDocValues.nextOrd()).utf8ToString());
        }
        return new DocumentField(IgnoredFieldMapper.NAME, ignoredValues);
    }

    private static Source addInferenceMetadataFields(MapperService mapperService, LeafReaderContext readerContext, int docId, Source source)
        throws IOException {
        var mappingLookup = mapperService.mappingLookup();
        var inferenceMetadata = (InferenceMetadataFieldsMapper) mappingLookup.getMapping()
            .getMetadataMapperByName(InferenceMetadataFieldsMapper.NAME);
        if (inferenceMetadata == null || mapperService.mappingLookup().inferenceFields().isEmpty()) {
            return source;
        }
        var inferenceLoader = inferenceMetadata.fieldType()
            .valueFetcher(mappingLookup, mapperService.getBitSetProducer(), new IndexSearcher(readerContext.reader()));
        inferenceLoader.setNextReader(readerContext);
        List<Object> values = inferenceLoader.fetchValues(source, docId, List.of());
        if (values.size() == 1) {
            var newSource = source.source();
            newSource.put(InferenceMetadataFieldsMapper.NAME, values.get(0));
            return Source.fromMap(newSource, source.sourceContentType());
        }
        return source;
    }

    private static StoredFieldLoader buildStoredFieldLoader(
        Set<String> fields,
        FetchSourceContext fetchSourceContext,
        SourceLoader loader
    ) {
        if (fetchSourceContext.fetchSource()) {
            fields.addAll(loader.requiredStoredFields());
        } else {
            if (fields.isEmpty()) {
                return StoredFieldLoader.empty();
            }
        }
        return StoredFieldLoader.create(fetchSourceContext.fetchSource(), fields);
    }
}
