/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */

package org.elasticsearch.reindex.management;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.transport.TransportService;

/**
 * Transport action for getting a reindex task. This validates that the task
 * is actually a reindex task before returning it.
 */
public class TransportGetReindexAction extends HandledTransportAction<GetReindexRequest, GetReindexResponse> {
    public static final ActionType<GetReindexResponse> TYPE = new ActionType<>("cluster:monitor/reindex/get");

    private static final Logger logger = LogManager.getLogger(TransportGetReindexAction.class);

    private final Client client;

    @Inject
    public TransportGetReindexAction(TransportService transportService, ActionFilters actionFilters, Client client) {
        super(TYPE.name(), transportService, actionFilters, GetReindexRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
        this.client = client;
    }

    @Override
    protected void doExecute(Task thisTask, GetReindexRequest request, ActionListener<GetReindexResponse> listener) {
        // We first issue a get task request with wait_for_completion=false to check if the task exists and is a reindex task, to avoid
        // waiting for tasks that are not reindex. If the original request has wait_for_completion=true, we issue a second get task request
        // with wait_for_completion=true to wait for the reindex task to complete.
        // Note that the underlying transport get task action is multi-project aware, so it will only return project specific tasks
        TaskId taskId = request.getTaskId();
        GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId).setWaitForCompletion(false);

        // Look for reindex task on the node inferred from the task id for running reindex task,
        // or from the ".tasks" system index for completed tasks
        client.admin().cluster().getTask(getTaskRequest, new ActionListener<>() {
            @Override
            public void onResponse(GetTaskResponse response) {
                TaskResult taskResult = response.getTask();
                // Found a matching task by id, but it's not a reindex task, treat it as not found to prevent leaking other tasks
                if (ReindexAction.NAME.equals(taskResult.getTask().action()) == false) {
                    logger.debug("task [{}] requested as reindex but is [{}], returning not found", taskId, taskResult.getTask().action());
                    listener.onFailure(notFoundException(taskId));
                    return;
                }
                // Found a matching reindex task by id, but it's a reindex subtask, treat it as not found to hide slicing implementation
                // details
                if (taskResult.getTask().parentTaskId().isSet()) {
                    logger.debug("reindex subtask [{}] requested directly, returning not found", taskId);
                    listener.onFailure(notFoundException(taskId));
                    return;
                }

                if (taskResult.isCompleted() || request.getWaitForCompletion() == false) {
                    listener.onResponse(new GetReindexResponse(taskResult));
                    return;
                }

                // If waiting for uncompleted task, we reissue the get request to wait for the reindex task to complete
                GetTaskRequest waitForTaskRequest = new GetTaskRequest().setTaskId(taskId)
                    .setWaitForCompletion(true)
                    .setTimeout(request.getTimeout());

                client.admin().cluster().getTask(waitForTaskRequest, new ActionListener<>() {
                    @Override
                    public void onResponse(GetTaskResponse waitForTaskResponse) {
                        listener.onResponse(new GetReindexResponse(waitForTaskResponse.getTask()));
                    }

                    @Override
                    public void onFailure(Exception e) {
                        if (e instanceof ResourceNotFoundException) {
                            // Wraps the task not found exception to hide task details
                            logger.debug("task [{}] not found, returning as reindex not found", taskId);
                            // TODO: Add searching for reallocated running reindex task on other nodes after relocation is added
                            listener.onFailure(notFoundException(taskId));
                        } else {
                            listener.onFailure(e);
                        }
                    }
                });
            }

            @Override
            public void onFailure(Exception e) {
                if (e instanceof ResourceNotFoundException) {
                    // Wraps the task not found exception to hide task details
                    logger.debug("task [{}] not found, returning as reindex not found", taskId);
                    // TODO: Add searching for reallocated running reindex task on other nodes after relocation is added
                    listener.onFailure(notFoundException(taskId));
                } else {
                    listener.onFailure(e);
                }
            }
        });
    }

    // visible for testing
    static ResourceNotFoundException notFoundException(TaskId taskId) {
        return new ResourceNotFoundException("Reindex operation [{}] not found", taskId);
    }
}
