# Copyright (c) 2023-2026, PostgreSQL Global Development Group

# logical decoding on standby : test logical decoding,
# recovery conflict and standby promotion.

use strict;
use warnings FATAL => 'all';

use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Time::HiRes qw(usleep);
use Test::More;

if ($ENV{enable_injection_points} ne 'yes')
{
	plan skip_all => 'Injection points not supported by this build';
}

my ($stdout, $stderr, $cascading_stdout, $cascading_stderr, $handle);

my $node_primary = PostgreSQL::Test::Cluster->new('primary');
my $node_standby = PostgreSQL::Test::Cluster->new('standby');
my $node_cascading_standby =
  PostgreSQL::Test::Cluster->new('cascading_standby');
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
my $res;

# Name for the physical slot on primary
my $primary_slotname = 'primary_physical';
my $standby_physical_slotname = 'standby_physical';

# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
# given boolean condition to be true to ensure we've reached a quiescent state.
sub wait_for_xmins
{
	my ($node, $slotname, $check_expr) = @_;

	$node->poll_query_until(
		'postgres', qq[
		SELECT $check_expr
		FROM pg_catalog.pg_replication_slots
		WHERE slot_name = '$slotname';
	]) or die "Timed out waiting for slot xmins to advance";
}

# Create the required logical slots on standby.
sub create_logical_slots
{
	my ($node, $slot_prefix) = @_;

	my $active_slot = $slot_prefix . 'activeslot';
	my $inactive_slot = $slot_prefix . 'inactiveslot';
	$node->create_logical_slot_on_standby($node_primary, qq($inactive_slot),
		'testdb');
	$node->create_logical_slot_on_standby($node_primary, qq($active_slot),
		'testdb');
}

# Drop the logical slots on standby.
sub drop_logical_slots
{
	my ($slot_prefix) = @_;
	my $active_slot = $slot_prefix . 'activeslot';
	my $inactive_slot = $slot_prefix . 'inactiveslot';

	$node_standby->psql('postgres',
		qq[SELECT pg_drop_replication_slot('$inactive_slot')]);
	$node_standby->psql('postgres',
		qq[SELECT pg_drop_replication_slot('$active_slot')]);
}

# Acquire one of the standby logical slots created by create_logical_slots().
# In case wait is true we are waiting for an active pid on the 'activeslot' slot.
# If wait is not true it means we are testing a known failure scenario.
sub make_slot_active
{
	my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
	my $slot_user_handle;

	my $active_slot = $slot_prefix . 'activeslot';
	$slot_user_handle = IPC::Run::start(
		[
			'pg_recvlogical',
			'--dbname' => $node->connstr('testdb'),
			'--slot' => $active_slot,
			'--option' => 'include-xids=0',
			'--option' => 'skip-empty-xacts=1',
			'--file' => '-',
			'--no-loop',
			'--start',
		],
		'>' => $to_stdout,
		'2>' => $to_stderr,
		IPC::Run::timeout($default_timeout));

	if ($wait)
	{
		# make sure activeslot is in use
		$node->poll_query_until('testdb',
			qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)]
		) or die "slot never became active";
	}
	return $slot_user_handle;
}

# Check pg_recvlogical stderr
sub check_pg_recvlogical_stderr
{
	my ($slot_user_handle, $check_stderr) = @_;
	my $return;

	# our client should've terminated in response to the walsender error
	$slot_user_handle->finish;
	$return = $?;
	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
	if ($return)
	{
		like($stderr, qr/$check_stderr/, 'slot has been invalidated');
	}

	return 0;
}

# Check if all the slots on standby are dropped. These include the 'activeslot'
# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
sub check_slots_dropped
{
	my ($slot_prefix, $slot_user_handle) = @_;

	is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'},
		'', 'inactiveslot on standby dropped');
	is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'},
		'', 'activeslot on standby dropped');

	check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery");
}

# Change hot_standby_feedback and check xmin and catalog_xmin values.
sub change_hot_standby_feedback_and_wait_for_xmins
{
	my ($hsf, $invalidated) = @_;

	$node_standby->append_conf(
		'postgresql.conf', qq[
	hot_standby_feedback = $hsf
	]);

	$node_standby->reload;

	if ($hsf && $invalidated)
	{
		# With hot_standby_feedback on, xmin should advance,
		# but catalog_xmin should still remain NULL since there is no logical slot.
		wait_for_xmins($node_primary, $primary_slotname,
			"xmin IS NOT NULL AND catalog_xmin IS NULL");
	}
	elsif ($hsf)
	{
		# With hot_standby_feedback on, xmin and catalog_xmin should advance.
		wait_for_xmins($node_primary, $primary_slotname,
			"xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
	}
	else
	{
		# Both should be NULL since hs_feedback is off
		wait_for_xmins($node_primary, $primary_slotname,
			"xmin IS NULL AND catalog_xmin IS NULL");

	}
}

# Check reason for conflict in pg_replication_slots.
sub check_slots_conflict_reason
{
	my ($slot_prefix, $reason) = @_;

	my $active_slot = $slot_prefix . 'activeslot';
	my $inactive_slot = $slot_prefix . 'inactiveslot';

	$res = $node_standby->safe_psql(
		'postgres', qq(
			 select invalidation_reason from pg_replication_slots where slot_name = '$active_slot' and conflicting;)
	);

	is($res, "$reason", "$active_slot reason for conflict is $reason");

	$res = $node_standby->safe_psql(
		'postgres', qq(
			 select invalidation_reason from pg_replication_slots where slot_name = '$inactive_slot' and conflicting;)
	);

	is($res, "$reason", "$inactive_slot reason for conflict is $reason");
}

# Drop the slots, re-create them, change hot_standby_feedback,
# check xmin and catalog_xmin values, make slot active and reset stat.
sub reactive_slots_change_hfs_and_wait_for_xmins
{
	my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;

	# drop the logical slots
	drop_logical_slots($previous_slot_prefix);

	# create the logical slots
	create_logical_slots($node_standby, $slot_prefix);

	change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated);

	$handle =
	  make_slot_active($node_standby, $slot_prefix, 1, \$stdout, \$stderr);

	# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
	$node_standby->psql('testdb', q[select pg_stat_reset();]);
}

# Check invalidation in the logfile and in pg_stat_database_conflicts
sub check_for_invalidation
{
	my ($slot_prefix, $log_start, $test_name) = @_;

	my $active_slot = $slot_prefix . 'activeslot';
	my $inactive_slot = $slot_prefix . 'inactiveslot';

	# message should be issued
	ok( $node_standby->log_contains(
			"invalidating obsolete replication slot \"$inactive_slot\"",
			$log_start),
		"inactiveslot slot invalidation is logged $test_name");

	ok( $node_standby->log_contains(
			"invalidating obsolete replication slot \"$active_slot\"",
			$log_start),
		"activeslot slot invalidation is logged $test_name");

	# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
	ok( $node_standby->poll_query_until(
			'postgres',
			"select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'",
			't'),
		'confl_active_logicalslot updated'
	) or die "Timed out waiting confl_active_logicalslot to be updated";
}

# Launch $sql query, wait for a new snapshot that has a newer horizon and
# launch a VACUUM.  $vac_option is the set of options to be passed to the
# VACUUM command, $sql the sql to launch before triggering the vacuum and
# $to_vac the relation to vacuum.
#
# Note that the injection_point avoids seeing a xl_running_xacts that could
# advance an active replication slot's catalog_xmin. Advancing the active
# replication slot's catalog_xmin would break some tests that expect the
# active slot to conflict with the catalog xmin horizon.
sub wait_until_vacuum_can_remove
{
	my ($vac_option, $sql, $to_vac) = @_;

	# Note that from this point the checkpointer and bgwriter will skip writing
	# xl_running_xacts record.
	$node_primary->safe_psql('testdb',
		"SELECT injection_points_attach('skip-log-running-xacts', 'error');");

	# Get the current xid horizon,
	my $xid_horizon = $node_primary->safe_psql('testdb',
		qq[select pg_snapshot_xmin(pg_current_snapshot());]);

	# Launch our sql.
	$node_primary->safe_psql('testdb', qq[$sql]);

	# Wait until we get a newer horizon.
	$node_primary->poll_query_until('testdb',
		"SELECT (select pg_snapshot_xmin(pg_current_snapshot())::text::int - $xid_horizon) > 0"
	) or die "new snapshot does not have a newer horizon";

	# Launch the vacuum command and insert into flush_wal (see CREATE
	# TABLE flush_wal for the reason).
	$node_primary->safe_psql(
		'testdb', qq[VACUUM $vac_option verbose $to_vac;
										  INSERT INTO flush_wal DEFAULT VALUES;]);

	$node_primary->wait_for_replay_catchup($node_standby);

	# Resume generating the xl_running_xacts record
	$node_primary->safe_psql('testdb',
		"SELECT injection_points_detach('skip-log-running-xacts');");
}

########################
# Initialize primary node
########################

$node_primary->init(allows_streaming => 1, has_archiving => 1);
$node_primary->append_conf(
	'postgresql.conf', q{
wal_level = 'logical'
max_replication_slots = 4
max_wal_senders = 4
autovacuum = off
});
$node_primary->dump_info;
$node_primary->start;

# Check if the extension injection_points is available, as it may be
# possible that this script is run with installcheck, where the module
# would not be installed by default.
if (!$node_primary->check_extension('injection_points'))
{
	plan skip_all => 'Extension injection_points not installed';
}

$node_primary->psql('postgres', q[CREATE DATABASE testdb]);

$node_primary->safe_psql('testdb',
	qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
);

# Check conflicting is NULL for physical slot
$res = $node_primary->safe_psql(
	'postgres', qq[
		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
);

is($res, 't', "Physical slot reports conflicting as NULL");

my $backup_name = 'b1';
$node_primary->backup($backup_name);

# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
# WAL. An insert into flush_wal outside transaction does guarantee a flush.
$node_primary->psql('testdb', q[CREATE TABLE flush_wal();]);

#######################
# Initialize standby node
#######################

$node_standby->init_from_backup(
	$node_primary, $backup_name,
	has_streaming => 1,
	has_restoring => 1);
$node_standby->append_conf(
	'postgresql.conf',
	qq[primary_slot_name = '$primary_slotname'
       max_replication_slots = 5]);
$node_standby->start;
$node_primary->wait_for_replay_catchup($node_standby);

#######################
# Initialize subscriber node
#######################
$node_subscriber->init;
$node_subscriber->start;

my %psql_subscriber = (
	'subscriber_stdin' => '',
	'subscriber_stdout' => '',
	'subscriber_stderr' => '');
$psql_subscriber{run} = IPC::Run::start(
	[
		'psql', '--no-psqlrc', '--no-align',
		'--file' => '-',
		'--dbname' => $node_subscriber->connstr('postgres')
	],
	'<' => \$psql_subscriber{subscriber_stdin},
	'>' => \$psql_subscriber{subscriber_stdout},
	'2>' => \$psql_subscriber{subscriber_stderr},
	IPC::Run::timeout($default_timeout));

##################################################
# Test that the standby requires hot_standby to be
# enabled for pre-existing logical slots.
##################################################

# create the logical slots
$node_standby->create_logical_slot_on_standby($node_primary, 'restart_test');
$node_standby->stop;
$node_standby->append_conf('postgresql.conf', qq[hot_standby = off]);

# Use run_log instead of $node_standby->start because this test expects
# that the server ends with an error during startup.
run_log(
	[
		'pg_ctl',
		'--pgdata' => $node_standby->data_dir,
		'--log' => $node_standby->logfile,
		'start',
	]);

# wait for postgres to terminate
foreach my $i (0 .. 10 * $PostgreSQL::Test::Utils::timeout_default)
{
	last if !-f $node_standby->data_dir . '/postmaster.pid';
	usleep(100_000);
}

# Confirm that the server startup fails with an expected error
my $logfile = slurp_file($node_standby->logfile());
like(
	$logfile,
	qr/FATAL: .* logical replication slot ".*" exists on the standby, but "hot_standby" = "off"/,
	"the standby ends with an error during startup because hot_standby was disabled"
);
$node_standby->adjust_conf('postgresql.conf', 'hot_standby', 'on');
$node_standby->start;
$node_standby->safe_psql('postgres',
	qq[SELECT pg_drop_replication_slot('restart_test')]);

##################################################
# Test that logical decoding on the standby
# behaves correctly.
##################################################

# create the logical slots
create_logical_slots($node_standby, 'behaves_ok_');

$node_primary->safe_psql('testdb',
	qq[CREATE TABLE decoding_test(x integer, y text);]);
$node_primary->safe_psql('testdb',
	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
);

$node_primary->wait_for_replay_catchup($node_standby);

my $result = $node_standby->safe_psql('testdb',
	qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, NULL);]
);

# test if basic decoding works
is(scalar(my @foobar = split /^/m, $result),
	14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');

# Insert some rows and verify that we get the same results from pg_recvlogical
# and the SQL interface.
$node_primary->safe_psql('testdb',
	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
);

my $expected = q{BEGIN
table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
COMMIT};

$node_primary->wait_for_replay_catchup($node_standby);

my $stdout_sql = $node_standby->safe_psql('testdb',
	qq[SELECT data FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
);

is($stdout_sql, $expected, 'got expected output from SQL decoding session');

my $endpos = $node_standby->safe_psql('testdb',
	"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
);

# Insert some rows after $endpos, which we won't read.
$node_primary->safe_psql('testdb',
	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
);

$node_primary->wait_for_replay_catchup($node_standby);

my $stdout_recv = $node_standby->pg_recvlogical_upto(
	'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
	'include-xids' => '0',
	'skip-empty-xacts' => '1');
chomp($stdout_recv);
is($stdout_recv, $expected,
	'got same expected output from pg_recvlogical decoding session');

$node_standby->poll_query_until('testdb',
	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)"
) or die "slot never became inactive";

$stdout_recv = $node_standby->pg_recvlogical_upto(
	'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
	'include-xids' => '0',
	'skip-empty-xacts' => '1');
chomp($stdout_recv);
is($stdout_recv, '', 'pg_recvlogical acknowledged changes');

$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');

# Wait for catchup to ensure that the new database is visible to other sessions
# on the standby.
$node_primary->wait_for_replay_catchup($node_standby);

($result, $stdout, $stderr) = $node_standby->psql('otherdb',
	"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
);
like(
	$stderr,
	qr/replication slot "behaves_ok_activeslot" was not created in this database/,
	"replaying logical slot from another database fails");

##################################################
# Test that we can subscribe on the standby with the publication
# created on the primary.
##################################################

# Create a table on the primary
$node_primary->safe_psql('postgres',
	"CREATE TABLE tab_rep (a int primary key)");

# Create a table (same structure) on the subscriber node
$node_subscriber->safe_psql('postgres',
	"CREATE TABLE tab_rep (a int primary key)");

# Create a publication on the primary
$node_primary->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub for table tab_rep");

$node_primary->wait_for_replay_catchup($node_standby);

# Subscribe on the standby
my $standby_connstr = $node_standby->connstr . ' dbname=postgres';

# Not using safe_psql() here as it would wait for activity on the primary
# and we wouldn't be able to launch pg_log_standby_snapshot() on the primary
# while waiting.
# psql_subscriber() allows to not wait synchronously.
$psql_subscriber{subscriber_stdin} .= qq[CREATE SUBSCRIPTION tap_sub
     CONNECTION '$standby_connstr'
     PUBLICATION tap_pub
     WITH (copy_data = off);];
$psql_subscriber{subscriber_stdin} .= "\n";

$psql_subscriber{run}->pump_nb();

# Log the standby snapshot to speed up the subscription creation
$node_primary->log_standby_snapshot($node_standby, 'tap_sub');

# Explicitly shut down psql instance gracefully - to avoid hangs
# or worse on windows
$psql_subscriber{subscriber_stdin} .= "\\q\n";
$psql_subscriber{run}->finish;

$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');

# Insert some rows on the primary
$node_primary->safe_psql('postgres',
	qq[INSERT INTO tab_rep select generate_series(1,10);]);

$node_primary->wait_for_replay_catchup($node_standby);
$node_standby->wait_for_catchup('tap_sub');

# Check that the subscriber can see the rows inserted in the primary
$result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
is($result, qq(10), 'check replicated inserts after subscription on standby');

# We do not need the subscription and the subscriber anymore
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
$node_subscriber->stop;

# Create the injection_points extension
$node_primary->safe_psql('testdb', 'CREATE EXTENSION injection_points;');

##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots
# Scenario 1: hot_standby_feedback off and vacuum FULL
#
# In passing, ensure that replication slot stats are not removed when the
# active slot is invalidated, and check that an error occurs when
# attempting to alter the invalid slot.
##################################################

# One way to produce recovery conflict is to create/drop a relation and
# launch a vacuum full on pg_class with hot_standby_feedback turned off on
# the standby.
reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_',
	0, 1);

# Ensure that replication slot stats are not empty before triggering the
# conflict.
$node_primary->safe_psql('testdb',
	qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]);

$node_standby->poll_query_until('testdb',
	qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
) or die "replication slot stats of vacuum_full_activeslot not updated";

# This should trigger the conflict
wait_until_vacuum_can_remove(
	'full', 'CREATE TABLE conflict_test(x integer, y text);
								 DROP TABLE conflict_test;', 'pg_class');

# Check invalidation in the logfile and in pg_stat_database_conflicts
check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');

# Verify reason for conflict is 'rows_removed' in pg_replication_slots
check_slots_conflict_reason('vacuum_full_', 'rows_removed');

# Attempting to alter an invalidated slot should result in an error
($result, $stdout, $stderr) = $node_standby->psql(
	'postgres',
	qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
	replication => 'database');
ok( $stderr =~
	  /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
	  && $stderr =~
	  /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
	"invalidated slot cannot be altered");

# Ensure that replication slot stats are not removed after invalidation.
is( $node_standby->safe_psql(
		'testdb',
		qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
	),
	't',
	'replication slot stats not removed after invalidation');

$handle =
  make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);

# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
	"can no longer access replication slot \"vacuum_full_activeslot\"");

# Attempt to copy an invalidated logical replication slot
($result, $stdout, $stderr) = $node_standby->psql(
	'postgres',
	qq[select pg_copy_logical_replication_slot('vacuum_full_inactiveslot', 'vacuum_full_inactiveslot_copy');],
	replication => 'database');
like(
	$stderr,
	qr/ERROR:  cannot copy invalidated replication slot "vacuum_full_inactiveslot"/,
	"invalidated slot cannot be copied");

# Set hot_standby_feedback to on
change_hot_standby_feedback_and_wait_for_xmins(1, 1);

##################################################
# Verify that invalidated logical slots stay invalidated across a restart.
##################################################
$node_standby->restart;

# Verify reason for conflict is retained across a restart.
check_slots_conflict_reason('vacuum_full_', 'rows_removed');

##################################################
# Verify that invalidated logical slots do not lead to retaining WAL.
##################################################

# Get the restart_lsn from an invalidated slot
my $restart_lsn = $node_standby->safe_psql(
	'postgres',
	"SELECT restart_lsn FROM pg_replication_slots
		WHERE slot_name = 'vacuum_full_activeslot' AND conflicting;"
);

chomp($restart_lsn);

# As pg_walfile_name() can not be executed on the standby,
# get the WAL file name associated to this lsn from the primary
my $walfile_name = $node_primary->safe_psql('postgres',
	"SELECT pg_walfile_name('$restart_lsn')");

chomp($walfile_name);

# Generate some activity and switch WAL file on the primary
$node_primary->advance_wal(1);
$node_primary->safe_psql('postgres', "checkpoint;");

# Wait for the standby to catch up
$node_primary->wait_for_replay_catchup($node_standby);

# Request a checkpoint on the standby to trigger the WAL file(s) removal
$node_standby->safe_psql('postgres', 'checkpoint;');

# Verify that the WAL file has not been retained on the standby
my $standby_walfile = $node_standby->data_dir . '/pg_wal/' . $walfile_name;
ok(!-f "$standby_walfile",
	"invalidated logical slots do not lead to retaining WAL");

##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots
# Scenario 2: conflict due to row removal with hot_standby_feedback off.
##################################################

# get the position to search from in the standby logfile
my $logstart = -s $node_standby->logfile;

# One way to produce recovery conflict is to create/drop a relation and
# launch a vacuum on pg_class with hot_standby_feedback turned off on the
# standby.
reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_',
	0, 1);

# This should trigger the conflict
wait_until_vacuum_can_remove(
	'', 'CREATE TABLE conflict_test(x integer, y text);
							 DROP TABLE conflict_test;', 'pg_class');

# Check invalidation in the logfile and in pg_stat_database_conflicts
check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');

# Verify reason for conflict is 'rows_removed' in pg_replication_slots
check_slots_conflict_reason('row_removal_', 'rows_removed');

$handle =
  make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);

# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
	"can no longer access replication slot \"row_removal_activeslot\"");

##################################################
# Recovery conflict: Same as Scenario 2 but on a shared catalog table
# Scenario 3: conflict due to row removal with hot_standby_feedback off.
##################################################

# get the position to search from in the standby logfile
$logstart = -s $node_standby->logfile;

# One way to produce recovery conflict on a shared catalog table is to
# create/drop a role and launch a vacuum on pg_authid with
# hot_standby_feedback turned off on the standby.
reactive_slots_change_hfs_and_wait_for_xmins('row_removal_',
	'shared_row_removal_', 0, 1);

# Trigger the conflict
wait_until_vacuum_can_remove(
	'', 'CREATE ROLE create_trash;
							 DROP ROLE create_trash;', 'pg_authid');

# Check invalidation in the logfile and in pg_stat_database_conflicts
check_for_invalidation('shared_row_removal_', $logstart,
	'with vacuum on pg_authid');

# Verify reason for conflict is 'rows_removed' in pg_replication_slots
check_slots_conflict_reason('shared_row_removal_', 'rows_removed');

$handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
	\$stderr);

# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
	"can no longer access replication slot \"shared_row_removal_activeslot\""
);

##################################################
# Recovery conflict: Same as Scenario 2 but on a non catalog table
# Scenario 4: No conflict expected.
##################################################

# get the position to search from in the standby logfile
$logstart = -s $node_standby->logfile;

reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_',
	'no_conflict_', 0, 1);

# This should not trigger a conflict
wait_until_vacuum_can_remove(
	'', 'CREATE TABLE conflict_test(x integer, y text);
							 INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
							 UPDATE conflict_test set x=1, y=1;', 'conflict_test');

# message should not be issued
ok( !$node_standby->log_contains(
		"invalidating obsolete replication slot \"no_conflict_inactiveslot\"", $logstart),
	'inactiveslot slot invalidation is not logged with vacuum on conflict_test'
);

ok( !$node_standby->log_contains(
		"invalidating obsolete replication slot \"no_conflict_activeslot\"", $logstart),
	'activeslot slot invalidation is not logged with vacuum on conflict_test'
);

# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
ok( $node_standby->poll_query_until(
		'postgres',
		"select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'",
		't'),
	'confl_active_logicalslot not updated'
) or die "Timed out waiting confl_active_logicalslot to be updated";

# Verify slots are reported as non conflicting in pg_replication_slots
is( $node_standby->safe_psql(
		'postgres',
		q[select bool_or(conflicting) from
		  (select conflicting from pg_replication_slots
			where slot_type = 'logical')]),
	'f',
	'Logical slots are reported as non conflicting');

# Turn hot_standby_feedback back on
change_hot_standby_feedback_and_wait_for_xmins(1, 0);

# Restart the standby node to ensure no slots are still active
$node_standby->restart;

##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots
# Scenario 5: conflict due to on-access pruning.
##################################################

# get the position to search from in the standby logfile
$logstart = -s $node_standby->logfile;

# One way to produce recovery conflict is to trigger an on-access pruning
# on a relation marked as user_catalog_table.
reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0,
	0);

# Injection point avoids seeing a xl_running_xacts. This is required because if
# it is generated between the last two updates, then the catalog_xmin of the
# active slot could be updated, and hence, the conflict won't occur. See
# comments atop wait_until_vacuum_can_remove.
$node_primary->safe_psql('testdb',
	"SELECT injection_points_attach('skip-log-running-xacts', 'error');");

# This should trigger the conflict
$node_primary->safe_psql('testdb',
	qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]
);
$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);

$node_primary->wait_for_replay_catchup($node_standby);

# Resume generating the xl_running_xacts record
$node_primary->safe_psql('testdb',
	"SELECT injection_points_detach('skip-log-running-xacts');");

# Check invalidation in the logfile and in pg_stat_database_conflicts
check_for_invalidation('pruning_', $logstart, 'with on-access pruning');

# Verify reason for conflict is 'rows_removed' in pg_replication_slots
check_slots_conflict_reason('pruning_', 'rows_removed');

$handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);

# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
	"can no longer access replication slot \"pruning_activeslot\"");

# Turn hot_standby_feedback back on
change_hot_standby_feedback_and_wait_for_xmins(1, 1);

##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots
# Scenario 6: incorrect wal_level on primary.
##################################################

# get the position to search from in the standby logfile
$logstart = -s $node_standby->logfile;

# drop the logical slots
drop_logical_slots('pruning_');

# create the logical slots
create_logical_slots($node_standby, 'wal_level_');

$handle =
  make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr);

# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
$node_standby->psql('testdb', q[select pg_stat_reset();]);

# Make primary wal_level replica. This will trigger slot conflict.
$node_primary->append_conf(
	'postgresql.conf', q[
wal_level = 'replica'
]);
$node_primary->restart;

$node_primary->wait_for_replay_catchup($node_standby);

# Check invalidation in the logfile and in pg_stat_database_conflicts
check_for_invalidation('wal_level_', $logstart, 'due to wal_level');

# Verify reason for conflict is 'wal_level_insufficient' in pg_replication_slots
check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');

$handle =
  make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
# We are not able to read from the slot as it requires effective_wal_level >= logical on
# the primary server
check_pg_recvlogical_stderr($handle,
	"logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"
);

# Restore primary wal_level
$node_primary->append_conf(
	'postgresql.conf', q[
wal_level = 'logical'
]);
$node_primary->restart;
$node_primary->wait_for_replay_catchup($node_standby);

$handle =
  make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
# as the slot has been invalidated we should not be able to read
check_pg_recvlogical_stderr($handle,
	"can no longer access replication slot \"wal_level_activeslot\"");

##################################################
# DROP DATABASE should drop its slots, including active slots.
##################################################

# drop the logical slots
drop_logical_slots('wal_level_');

# create the logical slots
create_logical_slots($node_standby, 'drop_db_');

$handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr);

# Create a slot on a database that would not be dropped. This slot should not
# get dropped.
$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot',
	'postgres');

# dropdb on the primary to verify slots are dropped on standby
$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);

$node_primary->wait_for_replay_catchup($node_standby);

is( $node_standby->safe_psql(
		'postgres',
		q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]),
	'f',
	'database dropped on standby');

check_slots_dropped('drop_db', $handle);

is($node_standby->slot('otherslot')->{'slot_type'},
	'logical', 'otherslot on standby not dropped');

# Cleanup : manually drop the slot that was not dropped.
$node_standby->psql('postgres',
	q[SELECT pg_drop_replication_slot('otherslot')]);

##################################################
# Test standby promotion and logical decoding behavior
# after the standby gets promoted.
##################################################

$node_standby->reload;

$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
$node_primary->safe_psql('testdb',
	qq[CREATE TABLE decoding_test(x integer, y text);]);

# Wait for the standby to catchup before initializing the cascading standby
$node_primary->wait_for_replay_catchup($node_standby);

# Create a physical replication slot on the standby.
# Keep this step after the "Verify that invalidated logical slots do not lead
# to retaining WAL" test (as the physical slot on the standby could prevent the
# WAL file removal).
$node_standby->safe_psql('testdb',
	qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]
);

# Initialize cascading standby node
$node_standby->backup($backup_name);
$node_cascading_standby->init_from_backup(
	$node_standby, $backup_name,
	has_streaming => 1,
	has_restoring => 1);
$node_cascading_standby->append_conf(
	'postgresql.conf',
	qq[primary_slot_name = '$standby_physical_slotname'
	   hot_standby_feedback = on]);
$node_cascading_standby->start;

# create the logical slots
create_logical_slots($node_standby, 'promotion_');

# Wait for the cascading standby to catchup before creating the slots
$node_standby->wait_for_replay_catchup($node_cascading_standby,
	$node_primary);

# create the logical slots on the cascading standby too
create_logical_slots($node_cascading_standby, 'promotion_');

# Make slots actives
$handle =
  make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr);
my $cascading_handle =
  make_slot_active($node_cascading_standby, 'promotion_', 1,
	\$cascading_stdout, \$cascading_stderr);

# Insert some rows before the promotion
$node_primary->safe_psql('testdb',
	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
);

# Wait for both standbys to catchup
$node_primary->wait_for_replay_catchup($node_standby);
$node_standby->wait_for_replay_catchup($node_cascading_standby,
	$node_primary);

# promote
$node_standby->promote;

# insert some rows on promoted standby
$node_standby->safe_psql('testdb',
	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;]
);

# Wait for the cascading standby to catchup
$node_standby->wait_for_replay_catchup($node_cascading_standby);

$expected = q{BEGIN
table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
COMMIT
BEGIN
table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
COMMIT};

# check that we are decoding pre and post promotion inserted rows
$stdout_sql = $node_standby->safe_psql('testdb',
	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
);

is($stdout_sql, $expected,
	'got expected output from SQL decoding session on promoted standby');

# check that we are decoding pre and post promotion inserted rows
# with pg_recvlogical that has started before the promotion
my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);

ok(pump_until($handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
	'got 2 COMMIT from pg_recvlogical output');

chomp($stdout);
is($stdout, $expected,
	'got same expected output from pg_recvlogical decoding session');

# check that we are decoding pre and post promotion inserted rows on the cascading standby
$stdout_sql = $node_cascading_standby->safe_psql('testdb',
	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
);

is($stdout_sql, $expected,
	'got expected output from SQL decoding session on cascading standby');

# check that we are decoding pre and post promotion inserted rows
# with pg_recvlogical that has started before the promotion on the cascading standby
ok( pump_until(
		$cascading_handle, $pump_timeout,
		\$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
	'got 2 COMMIT from pg_recvlogical output');

chomp($cascading_stdout);
is($cascading_stdout, $expected,
	'got same expected output from pg_recvlogical decoding session on cascading standby'
);

done_testing();
