/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */
package org.elasticsearch.repositories.s3;

import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.retry.RetryPolicyContext;
import software.amazon.awssdk.core.retry.conditions.RetryCondition;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.internal.DefaultS3EndpointProvider;
import software.amazon.awssdk.services.s3.model.S3Exception;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class S3ServiceTests extends ESTestCase {

    public void testCachedClientsAreReleased() {
        final S3Service s3Service = new S3Service(
            mock(Environment.class),
            ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
            TestProjectResolvers.DEFAULT_PROJECT_ONLY,
            mock(ResourceWatcherService.class),
            () -> Region.of("es-test-region")
        );
        s3Service.start();
        final Settings settings = Settings.builder().put("endpoint", "http://first").build();
        final RepositoryMetadata metadata1 = new RepositoryMetadata("first", "s3", settings);
        final RepositoryMetadata metadata2 = new RepositoryMetadata("second", "s3", settings);
        final S3ClientSettings clientSettings = s3Service.settings(ProjectId.DEFAULT, metadata2);
        final S3ClientSettings otherClientSettings = s3Service.settings(ProjectId.DEFAULT, metadata2);
        assertSame(clientSettings, otherClientSettings);
        final AmazonS3Reference reference = s3Service.client(randomFrom(ProjectId.DEFAULT, null), metadata1);
        reference.close();
        s3Service.onBlobStoreClose(ProjectId.DEFAULT);
        final AmazonS3Reference referenceReloaded = s3Service.client(randomFrom(ProjectId.DEFAULT, null), metadata1);
        assertNotSame(referenceReloaded, reference);
        referenceReloaded.close();
        s3Service.onBlobStoreClose(ProjectId.DEFAULT);
        final S3ClientSettings clientSettingsReloaded = s3Service.settings(ProjectId.DEFAULT, metadata1);
        assertNotSame(clientSettings, clientSettingsReloaded);
        s3Service.close();
    }

    public void testRetryOn403RetryPolicy() {
        AwsErrorDetails awsErrorDetails = AwsErrorDetails.builder().errorCode("InvalidAccessKeyId").build();
        AwsServiceException s3Exception = S3Exception.builder()
            .awsErrorDetails(awsErrorDetails)
            .statusCode(RestStatus.FORBIDDEN.getStatus())
            .build();

        // AWS default retry condition does not retry on 403
        assertFalse(
            RetryCondition.defaultRetryCondition()
                .shouldRetry(RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(s3Exception).build())
        );

        // The retryable 403 condition retries on 403 invalid access key id
        assertTrue(
            S3Service.isInvalidAccessKeyIdException(
                RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(s3Exception).build().exception()
            )
        );

        // Not retry for 403 with error code that is not invalid access key id
        String errorCode = randomAlphaOfLength(10);
        var exception = S3Exception.builder().awsErrorDetails(AwsErrorDetails.builder().errorCode(errorCode).build()).build();
        var retryPolicyContext = RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(exception).build();
        assertFalse(S3Service.isInvalidAccessKeyIdException(retryPolicyContext.exception()));
    }

    @TestLogging(reason = "testing WARN log output", value = "org.elasticsearch.repositories.s3.S3Service:WARN")
    public void testGetClientRegionFromSetting() {
        final var regionRequested = new AtomicBoolean();
        try (
            var s3Service = new S3Service(
                mock(Environment.class),
                ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
                TestProjectResolvers.DEFAULT_PROJECT_ONLY,
                mock(ResourceWatcherService.class),
                () -> {
                    assertTrue(regionRequested.compareAndSet(false, true));
                    if (randomBoolean()) {
                        throw new ElasticsearchException("simulated");
                    } else {
                        return randomFrom(randomFrom(Region.regions()), Region.of(randomIdentifier()));
                    }
                }
            );
            var mockLog = MockLog.capture(S3Service.class, S3DefaultRegionHolder.class)
        ) {
            mockLog.addExpectation(
                new MockLog.UnseenEventExpectation(
                    "no default region warning",
                    S3DefaultRegionHolder.class.getCanonicalName(),
                    Level.WARN,
                    "*"
                )
            );

            s3Service.start();
            assertTrue(regionRequested.get());

            final var clientName = randomBoolean() ? "default" : randomIdentifier();

            final var region = randomBoolean() ? randomFrom(Region.regions()) : Region.of(randomIdentifier());

            mockLog.addExpectation(new MockLog.UnseenEventExpectation("no warning", S3Service.class.getCanonicalName(), Level.WARN, "*"));
            mockLog.addExpectation(new MockLog.UnseenEventExpectation("no debug", S3Service.class.getCanonicalName(), Level.DEBUG, "*"));

            assertSame(
                region,
                s3Service.getClientRegion(
                    S3ClientSettings.getClientSettings(
                        Settings.builder().put("s3.client." + clientName + ".region", region.id()).build(),
                        clientName
                    )
                )
            );

            mockLog.assertAllExpectationsMatched();
        }
    }

    @TestLogging(reason = "testing WARN log output", value = "org.elasticsearch.repositories.s3.S3Service:WARN")
    public void testGetClientRegionFromEndpointSettingGuess() {
        final var regionRequested = new AtomicBoolean();
        try (
            var s3Service = new S3Service(
                mock(Environment.class),
                ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
                TestProjectResolvers.DEFAULT_PROJECT_ONLY,
                mock(ResourceWatcherService.class),
                () -> {
                    assertTrue(regionRequested.compareAndSet(false, true));
                    if (randomBoolean()) {
                        throw new ElasticsearchException("simulated");
                    } else {
                        return randomFrom(randomFrom(Region.regions()), Region.of(randomIdentifier()));
                    }
                }
            );
            var mockLog = MockLog.capture(S3Service.class, S3DefaultRegionHolder.class)
        ) {
            mockLog.addExpectation(
                new MockLog.UnseenEventExpectation(
                    "no default region warning",
                    S3DefaultRegionHolder.class.getCanonicalName(),
                    Level.WARN,
                    "*"
                )
            );

            s3Service.start();
            assertTrue(regionRequested.get());

            final var clientName = randomBoolean() ? "default" : randomIdentifier();

            final var guessedRegion = randomValueOtherThanMany(
                r -> r.isGlobalRegion() || r.id().contains("-gov-"),
                () -> randomFrom(Region.regions())
            );
            final var endpointUrl = safeGet(
                new DefaultS3EndpointProvider().resolveEndpoint(S3EndpointParams.builder().region(guessedRegion).build())
            ).url();
            final var endpoint = randomFrom(endpointUrl.toString(), endpointUrl.getHost());

            mockLog.addExpectation(
                new MockLog.SeenEventExpectation(
                    endpoint + " -> " + guessedRegion,
                    S3Service.class.getCanonicalName(),
                    Level.WARN,
                    Strings.format(
                        """
                            found S3 client with endpoint [%s] but no configured region, guessing it should use [%s]; \
                            to suppress this warning, configure the [s3.client.CLIENT_NAME.region] setting on this node""",
                        endpoint,
                        guessedRegion.id()
                    )
                )
            );
            assertEquals(
                endpoint,
                guessedRegion,
                s3Service.getClientRegion(
                    S3ClientSettings.getClientSettings(
                        Settings.builder().put("s3.client." + clientName + ".endpoint", endpoint).build(),
                        clientName
                    )
                )
            );

            mockLog.assertAllExpectationsMatched();
        }
    }

    @TestLogging(reason = "testing DEBUG log output", value = "org.elasticsearch.repositories.s3.S3Service:DEBUG")
    public void testGetClientRegionFromDefault() {
        final var regionRequested = new AtomicBoolean();
        final var defaultRegion = randomBoolean() ? randomFrom(Region.regions()) : Region.of(randomIdentifier());
        try (
            var s3Service = new S3Service(
                mock(Environment.class),
                ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
                TestProjectResolvers.DEFAULT_PROJECT_ONLY,
                mock(ResourceWatcherService.class),
                () -> {
                    assertTrue(regionRequested.compareAndSet(false, true));
                    return defaultRegion;
                }
            );
            var mockLog = MockLog.capture(S3Service.class, S3DefaultRegionHolder.class)
        ) {
            mockLog.addExpectation(
                new MockLog.UnseenEventExpectation(
                    "no default region warning",
                    S3DefaultRegionHolder.class.getCanonicalName(),
                    Level.WARN,
                    "*"
                )
            );

            s3Service.start();
            assertTrue(regionRequested.get());

            final var clientName = randomBoolean() ? "default" : randomIdentifier();

            mockLog.addExpectation(
                new MockLog.SeenEventExpectation(
                    "warning",
                    S3Service.class.getCanonicalName(),
                    Level.DEBUG,
                    "found S3 client with no configured region and no configured endpoint, using region ["
                        + defaultRegion.id()
                        + "] from SDK"
                )
            );

            assertSame(defaultRegion, s3Service.getClientRegion(S3ClientSettings.getClientSettings(Settings.EMPTY, clientName)));

            mockLog.assertAllExpectationsMatched();
        }
    }

    @TestLogging(reason = "testing WARN log output", value = "org.elasticsearch.repositories.s3.S3Service:WARN")
    public void testGetClientRegionFallbackToUsEast1() {
        final var regionRequested = new AtomicBoolean();
        final var exceptionMessage = randomIdentifier();
        try (
            var s3Service = new S3Service(
                mock(Environment.class),
                ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
                TestProjectResolvers.DEFAULT_PROJECT_ONLY,
                mock(ResourceWatcherService.class),
                () -> {
                    assertTrue(regionRequested.compareAndSet(false, true));
                    throw new ElasticsearchException(exceptionMessage);
                }
            );
            var mockLog = MockLog.capture(S3Service.class, S3DefaultRegionHolder.class)
        ) {
            s3Service.start();
            assertTrue(regionRequested.get());

            final var clientName = randomBoolean() ? "default" : randomIdentifier();

            mockLog.addExpectation(
                new MockLog.SeenEventExpectation(
                    "default provider chain failure",
                    S3DefaultRegionHolder.class.getCanonicalName(),
                    Level.WARN,
                    "failed to obtain region from default provider chain"
                ) {
                    @Override
                    public void match(LogEvent event) {
                        if (event.getThrown() instanceof ElasticsearchException e && exceptionMessage.equals(e.getMessage())) {
                            super.match(event);
                        }
                    }
                }
            );
            mockLog.addExpectation(new MockLog.SeenEventExpectation("warning", S3Service.class.getCanonicalName(), Level.WARN, """
                found S3 client with no configured region and no configured endpoint, \
                falling back to [us-east-1] and enabling cross-region access; \
                to suppress this warning, configure the [s3.client.CLIENT_NAME.region] setting on this node"""));

            assertNull(s3Service.getClientRegion(S3ClientSettings.getClientSettings(Settings.EMPTY, clientName)));

            mockLog.assertAllExpectationsMatched();
        }
    }

    public void testEndpointOverrideSchemeDefaultsToHttpsWhenNotSpecified() {
        final var endpointWithoutScheme = randomIdentifier() + ".ignore";
        final var clientName = randomIdentifier();
        assertThat(
            getEndpointUri(Settings.builder().put("s3.client." + clientName + ".endpoint", endpointWithoutScheme), clientName),
            equalTo(URI.create("https://" + endpointWithoutScheme))
        );
    }

    public void testEndpointOverrideSchemeUsesHttpsIfHttpsProtocolSpecified() {
        final var endpointWithoutScheme = randomIdentifier() + ".ignore";
        final var clientName = randomIdentifier();
        assertThat(
            getEndpointUri(
                Settings.builder()
                    .put("s3.client." + clientName + ".endpoint", endpointWithoutScheme)
                    .put("s3.client." + clientName + ".protocol", "https"),
                clientName
            ),
            equalTo(URI.create("https://" + endpointWithoutScheme))
        );
        assertWarnings(Strings.format("""
            [s3.client.%s.protocol] setting was deprecated in Elasticsearch and will be removed in a future release. \
            See the breaking changes documentation for the next major version.""", clientName));
    }

    public void testEndpointOverrideSchemeUsesHttpIfHttpProtocolSpecified() {
        final var endpointWithoutScheme = randomIdentifier() + ".ignore";
        final var clientName = randomIdentifier();
        assertThat(
            getEndpointUri(
                Settings.builder()
                    .put("s3.client." + clientName + ".endpoint", endpointWithoutScheme)
                    .put("s3.client." + clientName + ".protocol", "http"),
                clientName
            ),
            equalTo(URI.create("http://" + endpointWithoutScheme))
        );
        assertWarnings(Strings.format("""
            [s3.client.%s.protocol] setting was deprecated in Elasticsearch and will be removed in a future release. \
            See the breaking changes documentation for the next major version.""", clientName));
    }

    private URI getEndpointUri(Settings.Builder settings, String clientName) {
        try (
            var s3Service = new S3Service(
                mock(Environment.class),
                ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
                TestProjectResolvers.DEFAULT_PROJECT_ONLY,
                mock(ResourceWatcherService.class),
                () -> Region.of(randomIdentifier())
            )
        ) {
            s3Service.start();
            return s3Service.buildClient(S3ClientSettings.getClientSettings(settings.build(), clientName), mock(SdkHttpClient.class))
                .serviceClientConfiguration()
                .endpointOverride()
                .get();
        }
    }
}
