Log the conflicts while applying changes in logical replication.
authorAmit Kapila <akapila@postgresql.org>
Tue, 20 Aug 2024 03:05:11 +0000 (08:35 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 20 Aug 2024 03:05:11 +0000 (08:35 +0530)
This patch provides the additional logging information in the following
conflict scenarios while applying changes:

insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint.
update_differ: Updating a row that was previously modified by another origin.
update_exists: The updated row value violates a NOT DEFERRABLE unique constraint.
update_missing: The tuple to be updated is missing.
delete_differ: Deleting a row that was previously modified by another origin.
delete_missing: The tuple to be deleted is missing.

For insert_exists and update_exists conflicts, the log can include the origin
and commit timestamp details of the conflicting key with track_commit_timestamp
enabled.

update_differ and delete_differ conflicts can only be detected when
track_commit_timestamp is enabled on the subscriber.

We do not offer additional logging for exclusion constraint violations because
these constraints can specify rules that are more complex than simple equality
checks. Resolving such conflicts won't be straightforward. This area can be
further enhanced if required.

Author: Hou Zhijie
Reviewed-by: Shveta Malik, Amit Kapila, Nisha Moond, Hayato Kuroda, Dilip Kumar
Discussion: https://postgr.es/m/OS0PR01MB5716352552DFADB8E9AD1D8994C92@OS0PR01MB5716.jpnprd01.prod.outlook.com

18 files changed:
doc/src/sgml/logical-replication.sgml
src/backend/access/index/genam.c
src/backend/catalog/index.c
src/backend/executor/execIndexing.c
src/backend/executor/execMain.c
src/backend/executor/execReplication.c
src/backend/executor/nodeModifyTable.c
src/backend/replication/logical/Makefile
src/backend/replication/logical/conflict.c [new file with mode: 0644]
src/backend/replication/logical/meson.build
src/backend/replication/logical/worker.c
src/include/executor/executor.h
src/include/replication/conflict.h [new file with mode: 0644]
src/test/subscription/t/001_rep_changes.pl
src/test/subscription/t/013_partition.pl
src/test/subscription/t/029_on_error.pl
src/test/subscription/t/030_origin.pl
src/tools/pgindent/typedefs.list

index a23a3d57e2b47bc1d52646a5ec6a5b3be47523de..885a2d70ae77ef59f57372d24239259f58993f24 100644 (file)
@@ -1579,8 +1579,91 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
    node.  If incoming data violates any constraints the replication will
    stop.  This is referred to as a <firstterm>conflict</firstterm>.  When
    replicating <command>UPDATE</command> or <command>DELETE</command>
-   operations, missing data will not produce a conflict and such operations
-   will simply be skipped.
+   operations, missing data is also considered as a
+   <firstterm>conflict</firstterm>, but does not result in an error and such
+   operations will simply be skipped.
+  </para>
+
+  <para>
+   Additional logging is triggered in the following <firstterm>conflict</firstterm>
+   cases:
+   <variablelist>
+    <varlistentry>
+     <term><literal>insert_exists</literal></term>
+     <listitem>
+      <para>
+       Inserting a row that violates a <literal>NOT DEFERRABLE</literal>
+       unique constraint. Note that to log the origin and commit
+       timestamp details of the conflicting key,
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       should be enabled on the subscriber. In this case, an error will be
+       raised until the conflict is resolved manually.
+      </para>
+     </listitem>
+    </varlistentry>
+    <varlistentry>
+     <term><literal>update_differ</literal></term>
+     <listitem>
+      <para>
+       Updating a row that was previously modified by another origin.
+       Note that this conflict can only be detected when
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       is enabled on the subscriber. Currenly, the update is always applied
+       regardless of the origin of the local row.
+      </para>
+     </listitem>
+    </varlistentry>
+    <varlistentry>
+     <term><literal>update_exists</literal></term>
+     <listitem>
+      <para>
+       The updated value of a row violates a <literal>NOT DEFERRABLE</literal>
+       unique constraint. Note that to log the origin and commit
+       timestamp details of the conflicting key,
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       should be enabled on the subscriber. In this case, an error will be
+       raised until the conflict is resolved manually. Note that when updating a
+       partitioned table, if the updated row value satisfies another partition
+       constraint resulting in the row being inserted into a new partition, the
+       <literal>insert_exists</literal> conflict may arise if the new row
+       violates a <literal>NOT DEFERRABLE</literal> unique constraint.
+      </para>
+     </listitem>
+    </varlistentry>
+    <varlistentry>
+     <term><literal>update_missing</literal></term>
+     <listitem>
+      <para>
+       The tuple to be updated was not found. The update will simply be
+       skipped in this scenario.
+      </para>
+     </listitem>
+    </varlistentry>
+    <varlistentry>
+     <term><literal>delete_differ</literal></term>
+     <listitem>
+      <para>
+       Deleting a row that was previously modified by another origin. Note that
+       this conflict can only be detected when
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       is enabled on the subscriber. Currenly, the delete is always applied
+       regardless of the origin of the local row.
+      </para>
+     </listitem>
+    </varlistentry>
+    <varlistentry>
+     <term><literal>delete_missing</literal></term>
+     <listitem>
+      <para>
+       The tuple to be deleted was not found. The delete will simply be
+       skipped in this scenario.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+    Note that there are other conflict scenarios, such as exclusion constraint
+    violations. Currently, we do not provide additional details for them in the
+    log.
   </para>
 
   <para>
@@ -1597,7 +1680,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   A conflict will produce an error and will stop the replication; it must be
+   A conflict that produces an error will stop the replication; it must be
    resolved manually by the user.  Details about the conflict can be found in
    the subscriber's server log.
   </para>
@@ -1609,8 +1692,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
    an error, the replication won't proceed, and the logical replication worker will
    emit the following kind of message to the subscriber's server log:
 <screen>
-ERROR:  duplicate key value violates unique constraint "test_pkey"
-DETAIL:  Key (c)=(1) already exists.
+ERROR:  conflict detected on relation "public.test": conflict=insert_exists
+DETAIL:  Key already exists in unique index "t_pkey", which was modified locally in transaction 740 at 2024-06-26 10:47:04.727375+08.
+Key (c)=(1); existing local tuple (1, 'local'); remote tuple (1, 'remote').
 CONTEXT:  processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
 </screen>
    The LSN of the transaction that contains the change violating the constraint and
@@ -1636,6 +1720,15 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    Please note that skipping the whole transaction includes skipping changes that
    might not violate any constraint.  This can easily make the subscriber
    inconsistent.
+   The additional details regarding conflicting rows, such as their origin and
+   commit timestamp can be seen in the <literal>DETAIL</literal> line of the
+   log. But note that this information is only available when
+   <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+   is enabled on the subscriber. Users can use this information to decide
+   whether to retain the local change or adopt the remote alteration. For
+   instance, the <literal>DETAIL</literal> line in the above log indicates that
+   the existing row was modified locally. Users can manually perform a
+   remote-change-win.
   </para>
 
   <para>
index de751e8e4a30af3e06c7315e8db2de62793cd885..43c95d6109b377f10f34b522b4020e04555bf74c 100644 (file)
@@ -154,8 +154,9 @@ IndexScanEnd(IndexScanDesc scan)
  *
  * Construct a string describing the contents of an index entry, in the
  * form "(key_name, ...)=(key_value, ...)".  This is currently used
- * for building unique-constraint and exclusion-constraint error messages,
- * so only key columns of the index are checked and printed.
+ * for building unique-constraint, exclusion-constraint error messages, and
+ * logical replication conflict error messages so only key columns of the index
+ * are checked and printed.
  *
  * Note that if the user does not have permissions to view all of the
  * columns involved then a NULL is returned.  Returning a partial key seems
index a819b4197cee703ae3f56ca51e0b3382de9e5f1e..33759056e372d271a0146fe3ccccdef4a859cbd9 100644 (file)
@@ -2631,8 +2631,9 @@ CompareIndexInfo(const IndexInfo *info1, const IndexInfo *info2,
  *         Add extra state to IndexInfo record
  *
  * For unique indexes, we usually don't want to add info to the IndexInfo for
- * checking uniqueness, since the B-Tree AM handles that directly.  However,
- * in the case of speculative insertion, additional support is required.
+ * checking uniqueness, since the B-Tree AM handles that directly.  However, in
+ * the case of speculative insertion and conflict detection in logical
+ * replication, additional support is required.
  *
  * Do this processing here rather than in BuildIndexInfo() to not incur the
  * overhead in the common non-speculative cases.
index 9f05b3654c18665c7ccf79a1c4fd09fd05f5b5b4..403a3f4055121800afba28ce6d1c26e61b87d958 100644 (file)
@@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
        ii = BuildIndexInfo(indexDesc);
 
        /*
-        * If the indexes are to be used for speculative insertion, add extra
-        * information required by unique index entries.
+        * If the indexes are to be used for speculative insertion or conflict
+        * detection in logical replication, add extra information required by
+        * unique index entries.
         */
        if (speculative && ii->ii_Unique)
            BuildSpeculativeIndexInfo(indexDesc, ii);
@@ -519,14 +520,18 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
  *
  *     Note that this doesn't lock the values in any way, so it's
  *     possible that a conflicting tuple is inserted immediately
- *     after this returns.  But this can be used for a pre-check
- *     before insertion.
+ *     after this returns.  This can be used for either a pre-check
+ *     before insertion or a re-check after finding a conflict.
+ *
+ *     'tupleid' should be the TID of the tuple that has been recently
+ *     inserted (or can be invalid if we haven't inserted a new tuple yet).
+ *     This tuple will be excluded from conflict checking.
  * ----------------------------------------------------------------
  */
 bool
 ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
                          EState *estate, ItemPointer conflictTid,
-                         List *arbiterIndexes)
+                         ItemPointer tupleid, List *arbiterIndexes)
 {
    int         i;
    int         numIndices;
@@ -629,7 +634,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
 
        satisfiesConstraint =
            check_exclusion_or_unique_constraint(heapRelation, indexRelation,
-                                                indexInfo, &invalidItemPtr,
+                                                indexInfo, tupleid,
                                                 values, isnull, estate, false,
                                                 CEOUC_WAIT, true,
                                                 conflictTid);
index 4d7c92d63c19800977cd2567c2e494c79edbcd3e..29e186fa73dca40a353472ffc634da4402fcdff7 100644 (file)
@@ -88,11 +88,6 @@ static bool ExecCheckPermissionsModified(Oid relOid, Oid userid,
                                         Bitmapset *modifiedCols,
                                         AclMode requiredPerms);
 static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
-static char *ExecBuildSlotValueDescription(Oid reloid,
-                                          TupleTableSlot *slot,
-                                          TupleDesc tupdesc,
-                                          Bitmapset *modifiedCols,
-                                          int maxfieldlen);
 static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
 
 /* end of local decls */
@@ -2210,7 +2205,7 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
  * column involved, that subset will be returned with a key identifying which
  * columns they are.
  */
-static char *
+char *
 ExecBuildSlotValueDescription(Oid reloid,
                              TupleTableSlot *slot,
                              TupleDesc tupdesc,
index d0a89cd577821a0038ba7c2455c143726f646421..1086cbc9624766c739589517731a59ea4a91a45d 100644 (file)
@@ -23,6 +23,7 @@
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
+#include "replication/conflict.h"
 #include "replication/logicalrelation.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
    return skey_attoff;
 }
 
+
+/*
+ * Helper function to check if it is necessary to re-fetch and lock the tuple
+ * due to concurrent modifications. This function should be called after
+ * invoking table_tuple_lock.
+ */
+static bool
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+{
+   bool        refetch = false;
+
+   switch (res)
+   {
+       case TM_Ok:
+           break;
+       case TM_Updated:
+           /* XXX: Improve handling here */
+           if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+               ereport(LOG,
+                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                        errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+           else
+               ereport(LOG,
+                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                        errmsg("concurrent update, retrying")));
+           refetch = true;
+           break;
+       case TM_Deleted:
+           /* XXX: Improve handling here */
+           ereport(LOG,
+                   (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                    errmsg("concurrent delete, retrying")));
+           refetch = true;
+           break;
+       case TM_Invisible:
+           elog(ERROR, "attempted to lock invisible tuple");
+           break;
+       default:
+           elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+           break;
+   }
+
+   return refetch;
+}
+
 /*
  * Search the relation 'rel' for tuple using the index.
  *
@@ -260,34 +306,8 @@ retry:
 
        PopActiveSnapshot();
 
-       switch (res)
-       {
-           case TM_Ok:
-               break;
-           case TM_Updated:
-               /* XXX: Improve handling here */
-               if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
-                   ereport(LOG,
-                           (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                            errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
-               else
-                   ereport(LOG,
-                           (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                            errmsg("concurrent update, retrying")));
-               goto retry;
-           case TM_Deleted:
-               /* XXX: Improve handling here */
-               ereport(LOG,
-                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                        errmsg("concurrent delete, retrying")));
-               goto retry;
-           case TM_Invisible:
-               elog(ERROR, "attempted to lock invisible tuple");
-               break;
-           default:
-               elog(ERROR, "unexpected table_tuple_lock status: %u", res);
-               break;
-       }
+       if (should_refetch_tuple(res, &tmfd))
+           goto retry;
    }
 
    index_endscan(scan);
@@ -444,34 +464,8 @@ retry:
 
        PopActiveSnapshot();
 
-       switch (res)
-       {
-           case TM_Ok:
-               break;
-           case TM_Updated:
-               /* XXX: Improve handling here */
-               if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
-                   ereport(LOG,
-                           (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                            errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
-               else
-                   ereport(LOG,
-                           (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                            errmsg("concurrent update, retrying")));
-               goto retry;
-           case TM_Deleted:
-               /* XXX: Improve handling here */
-               ereport(LOG,
-                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                        errmsg("concurrent delete, retrying")));
-               goto retry;
-           case TM_Invisible:
-               elog(ERROR, "attempted to lock invisible tuple");
-               break;
-           default:
-               elog(ERROR, "unexpected table_tuple_lock status: %u", res);
-               break;
-       }
+       if (should_refetch_tuple(res, &tmfd))
+           goto retry;
    }
 
    table_endscan(scan);
@@ -480,6 +474,89 @@ retry:
    return found;
 }
 
+/*
+ * Find the tuple that violates the passed unique index (conflictindex).
+ *
+ * If the conflicting tuple is found return true, otherwise false.
+ *
+ * We lock the tuple to avoid getting it deleted before the caller can fetch
+ * the required information. Note that if the tuple is deleted before a lock
+ * is acquired, we will retry to find the conflicting tuple again.
+ */
+static bool
+FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
+                 Oid conflictindex, TupleTableSlot *slot,
+                 TupleTableSlot **conflictslot)
+{
+   Relation    rel = resultRelInfo->ri_RelationDesc;
+   ItemPointerData conflictTid;
+   TM_FailureData tmfd;
+   TM_Result   res;
+
+   *conflictslot = NULL;
+
+retry:
+   if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
+                                 &conflictTid, &slot->tts_tid,
+                                 list_make1_oid(conflictindex)))
+   {
+       if (*conflictslot)
+           ExecDropSingleTupleTableSlot(*conflictslot);
+
+       *conflictslot = NULL;
+       return false;
+   }
+
+   *conflictslot = table_slot_create(rel, NULL);
+
+   PushActiveSnapshot(GetLatestSnapshot());
+
+   res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
+                          *conflictslot,
+                          GetCurrentCommandId(false),
+                          LockTupleShare,
+                          LockWaitBlock,
+                          0 /* don't follow updates */ ,
+                          &tmfd);
+
+   PopActiveSnapshot();
+
+   if (should_refetch_tuple(res, &tmfd))
+       goto retry;
+
+   return true;
+}
+
+/*
+ * Check all the unique indexes in 'recheckIndexes' for conflict with the
+ * tuple in 'remoteslot' and report if found.
+ */
+static void
+CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
+                      ConflictType type, List *recheckIndexes,
+                      TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
+{
+   /* Check all the unique indexes for a conflict */
+   foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+   {
+       TupleTableSlot *conflictslot;
+
+       if (list_member_oid(recheckIndexes, uniqueidx) &&
+           FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
+                             &conflictslot))
+       {
+           RepOriginId origin;
+           TimestampTz committs;
+           TransactionId xmin;
+
+           GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+           ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+                               searchslot, conflictslot, remoteslot,
+                               uniqueidx, xmin, origin, committs);
+       }
+   }
+}
+
 /*
  * Insert tuple represented in the slot to the relation, update the indexes,
  * and execute any constraints and per-row triggers.
@@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
    if (!skip_tuple)
    {
        List       *recheckIndexes = NIL;
+       List       *conflictindexes;
+       bool        conflict = false;
 
        /* Compute stored generated columns */
        if (rel->rd_att->constr &&
@@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
        /* OK, store the tuple and create index entries for it */
        simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
 
+       conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
        if (resultRelInfo->ri_NumIndices > 0)
            recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
-                                                  slot, estate, false, false,
-                                                  NULL, NIL, false);
+                                                  slot, estate, false,
+                                                  conflictindexes ? true : false,
+                                                  &conflict,
+                                                  conflictindexes, false);
+
+       /*
+        * Checks the conflict indexes to fetch the conflicting local tuple
+        * and reports the conflict. We perform this check here, instead of
+        * performing an additional index scan before the actual insertion and
+        * reporting the conflict if any conflicting tuples are found. This is
+        * to avoid the overhead of executing the extra scan for each INSERT
+        * operation, even when no conflict arises, which could introduce
+        * significant overhead to replication, particularly in cases where
+        * conflicts are rare.
+        *
+        * XXX OTOH, this could lead to clean-up effort for dead tuples added
+        * in heap and index in case of conflicts. But as conflicts shouldn't
+        * be a frequent thing so we preferred to save the performance
+        * overhead of extra scan before each insertion.
+        */
+       if (conflict)
+           CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
+                                  recheckIndexes, NULL, slot);
 
        /* AFTER ROW INSERT Triggers */
        ExecARInsertTriggers(estate, resultRelInfo, slot,
@@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
    {
        List       *recheckIndexes = NIL;
        TU_UpdateIndexes update_indexes;
+       List       *conflictindexes;
+       bool        conflict = false;
 
        /* Compute stored generated columns */
        if (rel->rd_att->constr &&
@@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
        simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
                                  &update_indexes);
 
+       conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
        if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
            recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
-                                                  slot, estate, true, false,
-                                                  NULL, NIL,
+                                                  slot, estate, true,
+                                                  conflictindexes ? true : false,
+                                                  &conflict, conflictindexes,
                                                   (update_indexes == TU_Summarizing));
 
+       /*
+        * Refer to the comments above the call to CheckAndReportConflict() in
+        * ExecSimpleRelationInsert to understand why this check is done at
+        * this point.
+        */
+       if (conflict)
+           CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
+                                  recheckIndexes, searchslot, slot);
+
        /* AFTER ROW UPDATE Triggers */
        ExecARUpdateTriggers(estate, resultRelInfo,
                             NULL, NULL,
index 4913e493199dbef5311e54746b080820426ea186..8bf4c80d4a01b99869f951708785959718d1ab94 100644 (file)
@@ -1019,9 +1019,11 @@ ExecInsert(ModifyTableContext *context,
            /* Perform a speculative insertion. */
            uint32      specToken;
            ItemPointerData conflictTid;
+           ItemPointerData invalidItemPtr;
            bool        specConflict;
            List       *arbiterIndexes;
 
+           ItemPointerSetInvalid(&invalidItemPtr);
            arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes;
 
            /*
@@ -1041,7 +1043,8 @@ ExecInsert(ModifyTableContext *context,
            CHECK_FOR_INTERRUPTS();
            specConflict = false;
            if (!ExecCheckIndexConstraints(resultRelInfo, slot, estate,
-                                          &conflictTid, arbiterIndexes))
+                                          &conflictTid, &invalidItemPtr,
+                                          arbiterIndexes))
            {
                /* committed conflict tuple found */
                if (onconflict == ONCONFLICT_UPDATE)
index ba03eeff1c6ee65a80702f87fe7509a5b0b8162d..1e08bbbd4eb15496a61317e02c662dc79feb3354 100644 (file)
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = \
    applyparallelworker.o \
+   conflict.o \
    decode.o \
    launcher.o \
    logical.o \
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
new file mode 100644 (file)
index 0000000..0bc7959
--- /dev/null
@@ -0,0 +1,488 @@
+/*-------------------------------------------------------------------------
+ * conflict.c
+ *    Support routines for logging conflicts.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *   src/backend/replication/logical/conflict.c
+ *
+ * This file contains the code for logging conflicts on the subscriber during
+ * logical replication.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/commit_ts.h"
+#include "access/tableam.h"
+#include "executor/executor.h"
+#include "replication/conflict.h"
+#include "replication/logicalrelation.h"
+#include "storage/lmgr.h"
+#include "utils/lsyscache.h"
+
+static const char *const ConflictTypeNames[] = {
+   [CT_INSERT_EXISTS] = "insert_exists",
+   [CT_UPDATE_DIFFER] = "update_differ",
+   [CT_UPDATE_EXISTS] = "update_exists",
+   [CT_UPDATE_MISSING] = "update_missing",
+   [CT_DELETE_DIFFER] = "delete_differ",
+   [CT_DELETE_MISSING] = "delete_missing"
+};
+
+static int errcode_apply_conflict(ConflictType type);
+static int errdetail_apply_conflict(EState *estate,
+                                    ResultRelInfo *relinfo,
+                                    ConflictType type,
+                                    TupleTableSlot *searchslot,
+                                    TupleTableSlot *localslot,
+                                    TupleTableSlot *remoteslot,
+                                    Oid indexoid, TransactionId localxmin,
+                                    RepOriginId localorigin,
+                                    TimestampTz localts);
+static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
+                                      ConflictType type,
+                                      TupleTableSlot *searchslot,
+                                      TupleTableSlot *localslot,
+                                      TupleTableSlot *remoteslot,
+                                      Oid indexoid);
+static char *build_index_value_desc(EState *estate, Relation localrel,
+                                   TupleTableSlot *slot, Oid indexoid);
+
+/*
+ * Get the xmin and commit timestamp data (origin and timestamp) associated
+ * with the provided local tuple.
+ *
+ * Return true if the commit timestamp data was found, false otherwise.
+ */
+bool
+GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
+                       RepOriginId *localorigin, TimestampTz *localts)
+{
+   Datum       xminDatum;
+   bool        isnull;
+
+   xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
+                               &isnull);
+   *xmin = DatumGetTransactionId(xminDatum);
+   Assert(!isnull);
+
+   /*
+    * The commit timestamp data is not available if track_commit_timestamp is
+    * disabled.
+    */
+   if (!track_commit_timestamp)
+   {
+       *localorigin = InvalidRepOriginId;
+       *localts = 0;
+       return false;
+   }
+
+   return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
+}
+
+/*
+ * This function is used to report a conflict while applying replication
+ * changes.
+ *
+ * 'searchslot' should contain the tuple used to search the local tuple to be
+ * updated or deleted.
+ *
+ * 'localslot' should contain the existing local tuple, if any, that conflicts
+ * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
+ * transaction information related to this existing local tuple.
+ *
+ * 'remoteslot' should contain the remote new tuple, if any.
+ *
+ * The 'indexoid' represents the OID of the unique index that triggered the
+ * constraint violation error. We use this to report the key values for
+ * conflicting tuple.
+ *
+ * The caller must ensure that the index with the OID 'indexoid' is locked so
+ * that we can fetch and display the conflicting key value.
+ */
+void
+ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
+                   ConflictType type, TupleTableSlot *searchslot,
+                   TupleTableSlot *localslot, TupleTableSlot *remoteslot,
+                   Oid indexoid, TransactionId localxmin,
+                   RepOriginId localorigin, TimestampTz localts)
+{
+   Relation    localrel = relinfo->ri_RelationDesc;
+
+   Assert(!OidIsValid(indexoid) ||
+          CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+   ereport(elevel,
+           errcode_apply_conflict(type),
+           errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+                  get_namespace_name(RelationGetNamespace(localrel)),
+                  RelationGetRelationName(localrel),
+                  ConflictTypeNames[type]),
+           errdetail_apply_conflict(estate, relinfo, type, searchslot,
+                                    localslot, remoteslot, indexoid,
+                                    localxmin, localorigin, localts));
+}
+
+/*
+ * Find all unique indexes to check for a conflict and store them into
+ * ResultRelInfo.
+ */
+void
+InitConflictIndexes(ResultRelInfo *relInfo)
+{
+   List       *uniqueIndexes = NIL;
+
+   for (int i = 0; i < relInfo->ri_NumIndices; i++)
+   {
+       Relation    indexRelation = relInfo->ri_IndexRelationDescs[i];
+
+       if (indexRelation == NULL)
+           continue;
+
+       /* Detect conflict only for unique indexes */
+       if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
+           continue;
+
+       /* Don't support conflict detection for deferrable index */
+       if (!indexRelation->rd_index->indimmediate)
+           continue;
+
+       uniqueIndexes = lappend_oid(uniqueIndexes,
+                                   RelationGetRelid(indexRelation));
+   }
+
+   relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
+}
+
+/*
+ * Add SQLSTATE error code to the current conflict report.
+ */
+static int
+errcode_apply_conflict(ConflictType type)
+{
+   switch (type)
+   {
+       case CT_INSERT_EXISTS:
+       case CT_UPDATE_EXISTS:
+           return errcode(ERRCODE_UNIQUE_VIOLATION);
+       case CT_UPDATE_DIFFER:
+       case CT_UPDATE_MISSING:
+       case CT_DELETE_DIFFER:
+       case CT_DELETE_MISSING:
+           return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
+   }
+
+   Assert(false);
+   return 0;                   /* silence compiler warning */
+}
+
+/*
+ * Add an errdetail() line showing conflict detail.
+ *
+ * The DETAIL line comprises of two parts:
+ * 1. Explanation of the conflict type, including the origin and commit
+ *    timestamp of the existing local tuple.
+ * 2. Display of conflicting key, existing local tuple, remote new tuple, and
+ *    replica identity columns, if any. The remote old tuple is excluded as its
+ *    information is covered in the replica identity columns.
+ */
+static int
+errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
+                        ConflictType type, TupleTableSlot *searchslot,
+                        TupleTableSlot *localslot, TupleTableSlot *remoteslot,
+                        Oid indexoid, TransactionId localxmin,
+                        RepOriginId localorigin, TimestampTz localts)
+{
+   StringInfoData err_detail;
+   char       *val_desc;
+   char       *origin_name;
+
+   initStringInfo(&err_detail);
+
+   /* First, construct a detailed message describing the type of conflict */
+   switch (type)
+   {
+       case CT_INSERT_EXISTS:
+       case CT_UPDATE_EXISTS:
+           Assert(OidIsValid(indexoid));
+
+           if (localts)
+           {
+               if (localorigin == InvalidRepOriginId)
+                   appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
+                                    get_rel_name(indexoid),
+                                    localxmin, timestamptz_to_str(localts));
+               else if (replorigin_by_oid(localorigin, true, &origin_name))
+                   appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
+                                    get_rel_name(indexoid), origin_name,
+                                    localxmin, timestamptz_to_str(localts));
+
+               /*
+                * The origin that modified this row has been removed. This
+                * can happen if the origin was created by a different apply
+                * worker and its associated subscription and origin were
+                * dropped after updating the row, or if the origin was
+                * manually dropped by the user.
+                */
+               else
+                   appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
+                                    get_rel_name(indexoid),
+                                    localxmin, timestamptz_to_str(localts));
+           }
+           else
+               appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
+                                get_rel_name(indexoid), localxmin);
+
+           break;
+
+       case CT_UPDATE_DIFFER:
+           if (localorigin == InvalidRepOriginId)
+               appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
+                                localxmin, timestamptz_to_str(localts));
+           else if (replorigin_by_oid(localorigin, true, &origin_name))
+               appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
+                                origin_name, localxmin, timestamptz_to_str(localts));
+
+           /* The origin that modified this row has been removed. */
+           else
+               appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
+                                localxmin, timestamptz_to_str(localts));
+
+           break;
+
+       case CT_UPDATE_MISSING:
+           appendStringInfo(&err_detail, _("Could not find the row to be updated."));
+           break;
+
+       case CT_DELETE_DIFFER:
+           if (localorigin == InvalidRepOriginId)
+               appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
+                                localxmin, timestamptz_to_str(localts));
+           else if (replorigin_by_oid(localorigin, true, &origin_name))
+               appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
+                                origin_name, localxmin, timestamptz_to_str(localts));
+
+           /* The origin that modified this row has been removed. */
+           else
+               appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
+                                localxmin, timestamptz_to_str(localts));
+
+           break;
+
+       case CT_DELETE_MISSING:
+           appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
+           break;
+   }
+
+   Assert(err_detail.len > 0);
+
+   val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
+                                        localslot, remoteslot, indexoid);
+
+   /*
+    * Next, append the key values, existing local tuple, remote tuple and
+    * replica identity columns after the message.
+    */
+   if (val_desc)
+       appendStringInfo(&err_detail, "\n%s", val_desc);
+
+   return errdetail_internal("%s", err_detail.data);
+}
+
+/*
+ * Helper function to build the additional details for conflicting key,
+ * existing local tuple, remote tuple, and replica identity columns.
+ *
+ * If the return value is NULL, it indicates that the current user lacks
+ * permissions to view the columns involved.
+ */
+static char *
+build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
+                         ConflictType type,
+                         TupleTableSlot *searchslot,
+                         TupleTableSlot *localslot,
+                         TupleTableSlot *remoteslot,
+                         Oid indexoid)
+{
+   Relation    localrel = relinfo->ri_RelationDesc;
+   Oid         relid = RelationGetRelid(localrel);
+   TupleDesc   tupdesc = RelationGetDescr(localrel);
+   StringInfoData tuple_value;
+   char       *desc = NULL;
+
+   Assert(searchslot || localslot || remoteslot);
+
+   initStringInfo(&tuple_value);
+
+   /*
+    * Report the conflicting key values in the case of a unique constraint
+    * violation.
+    */
+   if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+   {
+       Assert(OidIsValid(indexoid) && localslot);
+
+       desc = build_index_value_desc(estate, localrel, localslot, indexoid);
+
+       if (desc)
+           appendStringInfo(&tuple_value, _("Key %s"), desc);
+   }
+
+   if (localslot)
+   {
+       /*
+        * The 'modifiedCols' only applies to the new tuple, hence we pass
+        * NULL for the existing local tuple.
+        */
+       desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
+                                            NULL, 64);
+
+       if (desc)
+       {
+           if (tuple_value.len > 0)
+           {
+               appendStringInfoString(&tuple_value, "; ");
+               appendStringInfo(&tuple_value, _("existing local tuple %s"),
+                                desc);
+           }
+           else
+           {
+               appendStringInfo(&tuple_value, _("Existing local tuple %s"),
+                                desc);
+           }
+       }
+   }
+
+   if (remoteslot)
+   {
+       Bitmapset  *modifiedCols;
+
+       /*
+        * Although logical replication doesn't maintain the bitmap for the
+        * columns being inserted, we still use it to create 'modifiedCols'
+        * for consistency with other calls to ExecBuildSlotValueDescription.
+        *
+        * Note that generated columns are formed locally on the subscriber.
+        */
+       modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
+                                ExecGetUpdatedCols(relinfo, estate));
+       desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
+                                            modifiedCols, 64);
+
+       if (desc)
+       {
+           if (tuple_value.len > 0)
+           {
+               appendStringInfoString(&tuple_value, "; ");
+               appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
+           }
+           else
+           {
+               appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
+           }
+       }
+   }
+
+   if (searchslot)
+   {
+       /*
+        * Note that while index other than replica identity may be used (see
+        * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
+        * when applying update or delete, such an index scan may not result
+        * in a unique tuple and we still compare the complete tuple in such
+        * cases, thus such indexes are not used here.
+        */
+       Oid         replica_index = GetRelationIdentityOrPK(localrel);
+
+       Assert(type != CT_INSERT_EXISTS);
+
+       /*
+        * If the table has a valid replica identity index, build the index
+        * key value string. Otherwise, construct the full tuple value for
+        * REPLICA IDENTITY FULL cases.
+        */
+       if (OidIsValid(replica_index))
+           desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
+       else
+           desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
+
+       if (desc)
+       {
+           if (tuple_value.len > 0)
+           {
+               appendStringInfoString(&tuple_value, "; ");
+               appendStringInfo(&tuple_value, OidIsValid(replica_index)
+                                ? _("replica identity %s")
+                                : _("replica identity full %s"), desc);
+           }
+           else
+           {
+               appendStringInfo(&tuple_value, OidIsValid(replica_index)
+                                ? _("Replica identity %s")
+                                : _("Replica identity full %s"), desc);
+           }
+       }
+   }
+
+   if (tuple_value.len == 0)
+       return NULL;
+
+   appendStringInfoChar(&tuple_value, '.');
+   return tuple_value.data;
+}
+
+/*
+ * Helper functions to construct a string describing the contents of an index
+ * entry. See BuildIndexValueDescription for details.
+ *
+ * The caller must ensure that the index with the OID 'indexoid' is locked so
+ * that we can fetch and display the conflicting key value.
+ */
+static char *
+build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
+                      Oid indexoid)
+{
+   char       *index_value;
+   Relation    indexDesc;
+   Datum       values[INDEX_MAX_KEYS];
+   bool        isnull[INDEX_MAX_KEYS];
+   TupleTableSlot *tableslot = slot;
+
+   if (!tableslot)
+       return NULL;
+
+   Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+   indexDesc = index_open(indexoid, NoLock);
+
+   /*
+    * If the slot is a virtual slot, copy it into a heap tuple slot as
+    * FormIndexDatum only works with heap tuple slots.
+    */
+   if (TTS_IS_VIRTUAL(slot))
+   {
+       tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+       tableslot = ExecCopySlot(tableslot, slot);
+   }
+
+   /*
+    * Initialize ecxt_scantuple for potential use in FormIndexDatum when
+    * index expressions are present.
+    */
+   GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+   /*
+    * The values/nulls arrays passed to BuildIndexValueDescription should be
+    * the results of FormIndexDatum, which are the "raw" input to the index
+    * AM.
+    */
+   FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+
+   index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+   index_close(indexDesc, NoLock);
+
+   return index_value;
+}
index 3dec36a6de5f5ebcdb76567f2806c178b315666d..3d36249d8ad42a8668bf3d805b12f8751d08e9a1 100644 (file)
@@ -2,6 +2,7 @@
 
 backend_sources += files(
   'applyparallelworker.c',
+  'conflict.c',
   'decode.c',
   'launcher.c',
   'logical.c',
index 245e9be6f27df862ab033d5ea242d5ea3b9c8201..cdea6295d8a82f59cf5c72d0099c64f75172bc45 100644 (file)
 #include "postmaster/bgworker.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/walwriter.h"
+#include "replication/conflict.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalproto.h"
 #include "replication/logicalrelation.h"
@@ -2481,7 +2482,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
    EState     *estate = edata->estate;
 
    /* We must open indexes here. */
-   ExecOpenIndices(relinfo, false);
+   ExecOpenIndices(relinfo, true);
+   InitConflictIndexes(relinfo);
 
    /* Do the insert. */
    TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
@@ -2669,13 +2671,12 @@ apply_handle_update_internal(ApplyExecutionData *edata,
    MemoryContext oldctx;
 
    EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
-   ExecOpenIndices(relinfo, false);
+   ExecOpenIndices(relinfo, true);
 
    found = FindReplTupleInLocalRel(edata, localrel,
                                    &relmapentry->remoterel,
                                    localindexoid,
                                    remoteslot, &localslot);
-   ExecClearTuple(remoteslot);
 
    /*
     * Tuple found.
@@ -2684,6 +2685,28 @@ apply_handle_update_internal(ApplyExecutionData *edata,
     */
    if (found)
    {
+       RepOriginId localorigin;
+       TransactionId localxmin;
+       TimestampTz localts;
+
+       /*
+        * Report the conflict if the tuple was modified by a different
+        * origin.
+        */
+       if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+           localorigin != replorigin_session_origin)
+       {
+           TupleTableSlot *newslot;
+
+           /* Store the new tuple for conflict reporting */
+           newslot = table_slot_create(localrel, &estate->es_tupleTable);
+           slot_store_data(newslot, relmapentry, newtup);
+
+           ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_DIFFER,
+                               remoteslot, localslot, newslot,
+                               InvalidOid, localxmin, localorigin, localts);
+       }
+
        /* Process and store remote tuple in the slot */
        oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
        slot_modify_data(remoteslot, localslot, relmapentry, newtup);
@@ -2691,6 +2714,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 
        EvalPlanQualSetSlot(&epqstate, remoteslot);
 
+       InitConflictIndexes(relinfo);
+
        /* Do the actual update. */
        TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
        ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
@@ -2698,16 +2723,19 @@ apply_handle_update_internal(ApplyExecutionData *edata,
    }
    else
    {
+       TupleTableSlot *newslot = localslot;
+
+       /* Store the new tuple for conflict reporting */
+       slot_store_data(newslot, relmapentry, newtup);
+
        /*
         * The tuple to be updated could not be found.  Do nothing except for
         * emitting a log message.
-        *
-        * XXX should this be promoted to ereport(LOG) perhaps?
         */
-       elog(DEBUG1,
-            "logical replication did not find row to be updated "
-            "in replication target relation \"%s\"",
-            RelationGetRelationName(localrel));
+       ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
+                           remoteslot, NULL, newslot,
+                           InvalidOid, InvalidTransactionId,
+                           InvalidRepOriginId, 0);
    }
 
    /* Cleanup. */
@@ -2830,6 +2858,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
    /* If found delete it. */
    if (found)
    {
+       RepOriginId localorigin;
+       TransactionId localxmin;
+       TimestampTz localts;
+
+       /*
+        * Report the conflict if the tuple was modified by a different
+        * origin.
+        */
+       if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+           localorigin != replorigin_session_origin)
+           ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_DIFFER,
+                               remoteslot, localslot, NULL,
+                               InvalidOid, localxmin, localorigin, localts);
+
        EvalPlanQualSetSlot(&epqstate, localslot);
 
        /* Do the actual delete. */
@@ -2841,13 +2883,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
        /*
         * The tuple to be deleted could not be found.  Do nothing except for
         * emitting a log message.
-        *
-        * XXX should this be promoted to ereport(LOG) perhaps?
         */
-       elog(DEBUG1,
-            "logical replication did not find row to be deleted "
-            "in replication target relation \"%s\"",
-            RelationGetRelationName(localrel));
+       ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
+                           remoteslot, NULL, NULL,
+                           InvalidOid, InvalidTransactionId,
+                           InvalidRepOriginId, 0);
    }
 
    /* Cleanup. */
@@ -3015,6 +3055,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                Relation    partrel_new;
                bool        found;
                EPQState    epqstate;
+               RepOriginId localorigin;
+               TransactionId localxmin;
+               TimestampTz localts;
 
                /* Get the matching local tuple from the partition. */
                found = FindReplTupleInLocalRel(edata, partrel,
@@ -3023,19 +3066,43 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                remoteslot_part, &localslot);
                if (!found)
                {
+                   TupleTableSlot *newslot = localslot;
+
+                   /* Store the new tuple for conflict reporting */
+                   slot_store_data(newslot, part_entry, newtup);
+
                    /*
                     * The tuple to be updated could not be found.  Do nothing
                     * except for emitting a log message.
-                    *
-                    * XXX should this be promoted to ereport(LOG) perhaps?
                     */
-                   elog(DEBUG1,
-                        "logical replication did not find row to be updated "
-                        "in replication target relation's partition \"%s\"",
-                        RelationGetRelationName(partrel));
+                   ReportApplyConflict(estate, partrelinfo,
+                                       LOG, CT_UPDATE_MISSING,
+                                       remoteslot_part, NULL, newslot,
+                                       InvalidOid, InvalidTransactionId,
+                                       InvalidRepOriginId, 0);
+
                    return;
                }
 
+               /*
+                * Report the conflict if the tuple was modified by a
+                * different origin.
+                */
+               if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+                   localorigin != replorigin_session_origin)
+               {
+                   TupleTableSlot *newslot;
+
+                   /* Store the new tuple for conflict reporting */
+                   newslot = table_slot_create(partrel, &estate->es_tupleTable);
+                   slot_store_data(newslot, part_entry, newtup);
+
+                   ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_DIFFER,
+                                       remoteslot_part, localslot, newslot,
+                                       InvalidOid, localxmin, localorigin,
+                                       localts);
+               }
+
                /*
                 * Apply the update to the local tuple, putting the result in
                 * remoteslot_part.
@@ -3046,7 +3113,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                MemoryContextSwitchTo(oldctx);
 
                EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
-               ExecOpenIndices(partrelinfo, false);
 
                /*
                 * Does the updated tuple still satisfy the current
@@ -3063,6 +3129,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                     * work already done above to find the local tuple in the
                     * partition.
                     */
+                   ExecOpenIndices(partrelinfo, true);
+                   InitConflictIndexes(partrelinfo);
+
                    EvalPlanQualSetSlot(&epqstate, remoteslot_part);
                    TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
                                          ACL_UPDATE);
@@ -3110,6 +3179,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                             get_namespace_name(RelationGetNamespace(partrel_new)),
                                             RelationGetRelationName(partrel_new));
 
+                   ExecOpenIndices(partrelinfo, false);
+
                    /* DELETE old tuple found in the old partition. */
                    EvalPlanQualSetSlot(&epqstate, localslot);
                    TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
index 046a7fb69b05e7f8689b0ea5f784c03496d56352..69c3ebff00a0095002cd3352e72f266fd74121bd 100644 (file)
@@ -228,6 +228,10 @@ extern void ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
                                        TupleTableSlot *slot, EState *estate);
 extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
                                 TupleTableSlot *slot, EState *estate);
+extern char *ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot,
+                                          TupleDesc tupdesc,
+                                          Bitmapset *modifiedCols,
+                                          int maxfieldlen);
 extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);
 extern ExecRowMark *ExecFindRowMark(EState *estate, Index rti, bool missing_ok);
 extern ExecAuxRowMark *ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist);
@@ -643,6 +647,7 @@ extern List *ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
 extern bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo,
                                      TupleTableSlot *slot,
                                      EState *estate, ItemPointer conflictTid,
+                                     ItemPointer tupleid,
                                      List *arbiterIndexes);
 extern void check_exclusion_constraint(Relation heap, Relation index,
                                       IndexInfo *indexInfo,
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
new file mode 100644 (file)
index 0000000..02cb84d
--- /dev/null
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ * conflict.h
+ *    Exports for conflicts logging.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONFLICT_H
+#define CONFLICT_H
+
+#include "nodes/execnodes.h"
+#include "utils/timestamp.h"
+
+/*
+ * Conflict types that could occur while applying remote changes.
+ */
+typedef enum
+{
+   /* The row to be inserted violates unique constraint */
+   CT_INSERT_EXISTS,
+
+   /* The row to be updated was modified by a different origin */
+   CT_UPDATE_DIFFER,
+
+   /* The updated row value violates unique constraint */
+   CT_UPDATE_EXISTS,
+
+   /* The row to be updated is missing */
+   CT_UPDATE_MISSING,
+
+   /* The row to be deleted was modified by a different origin */
+   CT_DELETE_DIFFER,
+
+   /* The row to be deleted is missing */
+   CT_DELETE_MISSING,
+
+   /*
+    * Other conflicts, such as exclusion constraint violations, involve more
+    * complex rules than simple equality checks. These conflicts are left for
+    * future improvements.
+    */
+} ConflictType;
+
+extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
+                                   TransactionId *xmin,
+                                   RepOriginId *localorigin,
+                                   TimestampTz *localts);
+extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
+                               int elevel, ConflictType type,
+                               TupleTableSlot *searchslot,
+                               TupleTableSlot *localslot,
+                               TupleTableSlot *remoteslot,
+                               Oid indexoid, TransactionId localxmin,
+                               RepOriginId localorigin, TimestampTz localts);
+extern void InitConflictIndexes(ResultRelInfo *relInfo);
+
+#endif
index 471e9819628f51faee700bd5ddad8da84e0cfaf5..d377e7ae2b61cbabb9ecbd06fe7900c66c373642 100644 (file)
@@ -331,13 +331,8 @@ is( $result, qq(1|bar
 2|baz),
    'update works with REPLICA IDENTITY FULL and a primary key');
 
-# Check that subscriber handles cases where update/delete target tuple
-# is missing.  We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
-$node_subscriber->reload;
-
 $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk");
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full WHERE a = 25");
 
 # Note that the current location of the log file is not grabbed immediately
 # after reloading the configuration, but after sending one SQL command to
@@ -346,16 +341,21 @@ my $log_location = -s $node_subscriber->logfile;
 
 $node_publisher->safe_psql('postgres',
    "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1");
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab_full SET a = a + 1 WHERE a = 25");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2");
 
 $node_publisher->wait_for_catchup('tap_sub');
 
 my $logfile = slurp_file($node_subscriber->logfile, $log_location);
 ok( $logfile =~
-     qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/,
+     qr/conflict detected on relation "public.tab_full_pk": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(1, quux\); replica identity \(a\)=\(1\)/m,
+   'update target row is missing');
+ok( $logfile =~
+     qr/conflict detected on relation "public.tab_full": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(26\); replica identity full \(25\)/m,
    'update target row is missing');
 ok( $logfile =~
-     qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/,
+     qr/conflict detected on relation "public.tab_full_pk": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(2\)/m,
    'delete target row is missing');
 
 $node_subscriber->append_conf('postgresql.conf',
@@ -517,7 +517,7 @@ is($result, qq(1052|1|1002),
 
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*), min(a), max(a) FROM tab_full");
-is($result, qq(21|0|100), 'check replicated insert after alter publication');
+is($result, qq(19|0|100), 'check replicated insert after alter publication');
 
 # check restart on rename
 $oldpid = $node_publisher->safe_psql('postgres',
index 29580525a97d8f86f29d702ee0cada0426fe973a..cf91542ed000137bffe34bb201a559049ed4ef3a 100644 (file)
@@ -343,13 +343,6 @@ $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
 is($result, qq(), 'truncate of tab1 replicated');
 
-# Check that subscriber handles cases where update/delete target tuple
-# is missing.  We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber1->append_conf('postgresql.conf',
-   "log_min_messages = debug1");
-$node_subscriber1->reload;
-
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')");
 
@@ -372,22 +365,18 @@ $node_publisher->wait_for_catchup('sub2');
 
 my $logfile = slurp_file($node_subscriber1->logfile(), $log_location);
 ok( $logfile =~
-     qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/,
+     qr/conflict detected on relation "public.tab1_2_2": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(null, 4, quux\); replica identity \(a\)=\(4\)/,
    'update target row is missing in tab1_2_2');
 ok( $logfile =~
-     qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/,
+     qr/conflict detected on relation "public.tab1_1": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(1\)/,
    'delete target row is missing in tab1_1');
 ok( $logfile =~
-     qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/,
+     qr/conflict detected on relation "public.tab1_2_2": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(4\)/,
    'delete target row is missing in tab1_2_2');
 ok( $logfile =~
-     qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/,
+     qr/conflict detected on relation "public.tab1_def": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(10\)/,
    'delete target row is missing in tab1_def');
 
-$node_subscriber1->append_conf('postgresql.conf',
-   "log_min_messages = warning");
-$node_subscriber1->reload;
-
 # Tests for replication using root table identity and schema
 
 # publisher
@@ -773,13 +762,6 @@ pub_tab2|3|yyy
 pub_tab2|5|zzz
 xxx_c|6|aaa), 'inserts into tab2 replicated');
 
-# Check that subscriber handles cases where update/delete target tuple
-# is missing.  We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber1->append_conf('postgresql.conf',
-   "log_min_messages = debug1");
-$node_subscriber1->reload;
-
 $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2");
 
 # Note that the current location of the log file is not grabbed immediately
@@ -796,15 +778,34 @@ $node_publisher->wait_for_catchup('sub2');
 
 $logfile = slurp_file($node_subscriber1->logfile(), $log_location);
 ok( $logfile =~
-     qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/,
+     qr/conflict detected on relation "public.tab2_1": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(pub_tab2, quux, 5\); replica identity \(a\)=\(5\)/,
    'update target row is missing in tab2_1');
 ok( $logfile =~
-     qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
+     qr/conflict detected on relation "public.tab2_1": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(1\)/,
    'delete target row is missing in tab2_1');
 
+# Enable the track_commit_timestamp to detect the conflict when attempting
+# to update a row that was previously modified by a different origin.
+$node_subscriber1->append_conf('postgresql.conf',
+   'track_commit_timestamp = on');
+$node_subscriber1->restart;
+
+$node_subscriber1->safe_psql('postgres',
+   "INSERT INTO tab2 VALUES (3, 'yyy')");
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab2 SET b = 'quux' WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+
+$logfile = slurp_file($node_subscriber1->logfile(), $log_location);
+ok( $logfile =~
+     qr/conflict detected on relation "public.tab2_1": conflict=update_differ.*\n.*DETAIL:.* Updating the row that was modified locally in transaction [0-9]+ at .*\n.*Existing local tuple \(yyy, null, 3\); remote tuple \(pub_tab2, quux, 3\); replica identity \(a\)=\(3\)/,
+   'updating a tuple that was modified by a different origin');
+
+# The remaining tests no longer test conflict detection.
 $node_subscriber1->append_conf('postgresql.conf',
-   "log_min_messages = warning");
-$node_subscriber1->reload;
+   'track_commit_timestamp = off');
+$node_subscriber1->restart;
 
 # Test that replication continues to work correctly after altering the
 # partition of a partitioned target table.
index 0ab57a4b5b0e46c139071862bdf17bf80ef4ffe2..2f099a74f39b5b5e6052b35c382fcfc3bece0ffe 100644 (file)
@@ -30,7 +30,7 @@ sub test_skip_lsn
    # ERROR with its CONTEXT when retrieving this information.
    my $contents = slurp_file($node_subscriber->logfile, $offset);
    $contents =~
-     qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
+     qr/conflict detected on relation "public.tbl".*\n.*DETAIL:.* Key already exists in unique index "tbl_pkey", modified by .*origin.* transaction \d+ at .*\n.*Key \(i\)=\(\d+\); existing local tuple .*; remote tuple .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
      or die "could not get error-LSN";
    my $lsn = $1;
 
@@ -83,6 +83,7 @@ $node_subscriber->append_conf(
    'postgresql.conf',
    qq[
 max_prepared_transactions = 10
+track_commit_timestamp = on
 ]);
 $node_subscriber->start;
 
@@ -93,6 +94,7 @@ $node_publisher->safe_psql(
    'postgres',
    qq[
 CREATE TABLE tbl (i INT, t BYTEA);
+ALTER TABLE tbl REPLICA IDENTITY FULL;
 INSERT INTO tbl VALUES (1, NULL);
 ]);
 $node_subscriber->safe_psql(
@@ -144,13 +146,14 @@ COMMIT;
 test_skip_lsn($node_publisher, $node_subscriber,
    "(2, NULL)", "2", "test skipping transaction");
 
-# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
-# PREPARE the transaction, raising an error. Then skip the transaction.
+# Test for PREPARE and COMMIT PREPARED. Update the data and PREPARE the
+# transaction, raising an error on the subscriber due to violation of the
+# unique constraint on tbl. Then skip the transaction.
 $node_publisher->safe_psql(
    'postgres',
    qq[
 BEGIN;
-INSERT INTO tbl VALUES (1, NULL);
+UPDATE tbl SET i = 2;
 PREPARE TRANSACTION 'gtx';
 COMMIT PREPARED 'gtx';
 ]);
index 056561f00841ca4fa1e6f12c25c148e060db06e1..01536a13e73af6ca4d947cfe2acec3fd475e2c5b 100644 (file)
@@ -27,9 +27,14 @@ my $stderr;
 my $node_A = PostgreSQL::Test::Cluster->new('node_A');
 $node_A->init(allows_streaming => 'logical');
 $node_A->start;
+
 # node_B
 my $node_B = PostgreSQL::Test::Cluster->new('node_B');
 $node_B->init(allows_streaming => 'logical');
+
+# Enable the track_commit_timestamp to detect the conflict when attempting to
+# update a row that was previously modified by a different origin.
+$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on');
 $node_B->start;
 
 # Create table on node_A
@@ -139,6 +144,48 @@ is($result, qq(),
    'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
 );
 
+###############################################################################
+# Check that the conflict can be detected when attempting to update or
+# delete a row that was previously modified by a different source.
+###############################################################################
+
+$node_B->safe_psql('postgres', "DELETE FROM tab;");
+
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
+
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(32), 'The node_A data replicated to node_B');
+
+# The update should update the row on node B that was inserted by node A.
+$node_C->safe_psql('postgres', "UPDATE tab SET a = 33 WHERE a = 32;");
+
+$node_B->wait_for_log(
+   qr/conflict detected on relation "public.tab": conflict=update_differ.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local tuple \(32\); remote tuple \(33\); replica identity \(a\)=\(32\)/
+);
+
+$node_B->safe_psql('postgres', "DELETE FROM tab;");
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (33);");
+
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(33), 'The node_A data replicated to node_B');
+
+# The delete should remove the row on node B that was inserted by node A.
+$node_C->safe_psql('postgres', "DELETE FROM tab WHERE a = 33;");
+
+$node_B->wait_for_log(
+   qr/conflict detected on relation "public.tab": conflict=delete_differ.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local tuple \(33\); replica identity \(a\)=\(33\)/
+);
+
+# The remaining tests no longer test conflict detection.
+$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off');
+$node_B->restart;
+
 ###############################################################################
 # Specifying origin = NONE indicates that the publisher should only replicate the
 # changes that are generated locally from node_B, but in this case since the
index 547d14b3e7ccd92d233c17a68c132581c13da79a..6d424c89186cfb17b3b63b72dd7eefa3bd32204f 100644 (file)
@@ -467,6 +467,7 @@ ConditionVariableMinimallyPadded
 ConditionalStack
 ConfigData
 ConfigVariable
+ConflictType
 ConnCacheEntry
 ConnCacheKey
 ConnParams