file_fdw: Add on_error and log_verbosity options to file_fdw.
authorFujii Masao <fujii@postgresql.org>
Thu, 3 Oct 2024 06:57:32 +0000 (15:57 +0900)
committerFujii Masao <fujii@postgresql.org>
Thu, 3 Oct 2024 06:57:32 +0000 (15:57 +0900)
In v17, the on_error and log_verbosity options were introduced for
the COPY command. This commit extends support for these options
to file_fdw.

Setting on_error = 'ignore' for a file_fdw foreign table allows users
to query it without errors, even when the input file contains
malformed rows, by skipping the problematic rows.

Both on_error and log_verbosity options apply to SELECT and ANALYZE
operations on file_fdw foreign tables.

Author: Atsushi Torikoshi
Reviewed-by: Masahiko Sawada, Fujii Masao
Discussion: https://postgr.es/m/ab59dad10490ea3734cf022b16c24cfd@oss.nttdata.com

contrib/file_fdw/expected/file_fdw.out
contrib/file_fdw/file_fdw.c
contrib/file_fdw/sql/file_fdw.sql
doc/src/sgml/file-fdw.sgml

index 86c148a86ba3afa8d916c6323c0ba99f10f6c17c..593fdc782e37d6ecc739f505e65a77474eb3cb88 100644 (file)
@@ -206,6 +206,25 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 SELECT * FROM agg_bad;               -- ERROR
 ERROR:  invalid input syntax for type real: "aaa"
 CONTEXT:  COPY agg_bad, line 3, column b: "aaa"
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+NOTICE:  1 row was skipped due to data type incompatibility
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
+SELECT * FROM agg_bad;
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
+ANALYZE agg_bad;
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
index d16821f8e1b46081ceb523e7629a79c7458a0467..043204c3e7ec447151be1923adadcda6025af341 100644 (file)
@@ -22,6 +22,7 @@
 #include "catalog/pg_authid.h"
 #include "catalog/pg_foreign_table.h"
 #include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = {
    {"null", ForeignTableRelationId},
    {"default", ForeignTableRelationId},
    {"encoding", ForeignTableRelationId},
+   {"on_error", ForeignTableRelationId},
+   {"log_verbosity", ForeignTableRelationId},
    {"force_not_null", AttributeRelationId},
    {"force_null", AttributeRelationId},
 
@@ -723,38 +726,74 @@ fileIterateForeignScan(ForeignScanState *node)
    FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
    EState     *estate = CreateExecutorState();
    ExprContext *econtext;
-   MemoryContext oldcontext;
+   MemoryContext oldcontext = CurrentMemoryContext;
    TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
-   bool        found;
+   CopyFromState cstate = festate->cstate;
    ErrorContextCallback errcallback;
 
    /* Set up callback to identify error line number. */
    errcallback.callback = CopyFromErrorCallback;
-   errcallback.arg = (void *) festate->cstate;
+   errcallback.arg = (void *) cstate;
    errcallback.previous = error_context_stack;
    error_context_stack = &errcallback;
 
    /*
-    * The protocol for loading a virtual tuple into a slot is first
-    * ExecClearTuple, then fill the values/isnull arrays, then
-    * ExecStoreVirtualTuple.  If we don't find another row in the file, we
-    * just skip the last step, leaving the slot empty as required.
-    *
     * We pass ExprContext because there might be a use of the DEFAULT option
     * in COPY FROM, so we may need to evaluate default expressions.
     */
-   ExecClearTuple(slot);
    econtext = GetPerTupleExprContext(estate);
 
+retry:
+
    /*
     * DEFAULT expressions need to be evaluated in a per-tuple context, so
     * switch in case we are doing that.
     */
-   oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-   found = NextCopyFrom(festate->cstate, econtext,
-                        slot->tts_values, slot->tts_isnull);
-   if (found)
+   MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+   /*
+    * The protocol for loading a virtual tuple into a slot is first
+    * ExecClearTuple, then fill the values/isnull arrays, then
+    * ExecStoreVirtualTuple.  If we don't find another row in the file, we
+    * just skip the last step, leaving the slot empty as required.
+    *
+    */
+   ExecClearTuple(slot);
+
+   if (NextCopyFrom(cstate, econtext, slot->tts_values, slot->tts_isnull))
+   {
+       if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+           cstate->escontext->error_occurred)
+       {
+           /*
+            * Soft error occurred, skip this tuple and just make
+            * ErrorSaveContext ready for the next NextCopyFrom. Since we
+            * don't set details_wanted and error_data is not to be filled,
+            * just resetting error_occurred is enough.
+            */
+           cstate->escontext->error_occurred = false;
+
+           /* Switch back to original memory context */
+           MemoryContextSwitchTo(oldcontext);
+
+           /*
+            * Make sure we are interruptible while repeatedly calling
+            * NextCopyFrom() until no soft error occurs.
+            */
+           CHECK_FOR_INTERRUPTS();
+
+           /*
+            * Reset the per-tuple exprcontext, to clean-up after expression
+            * evaluations etc.
+            */
+           ResetPerTupleExprContext(estate);
+
+           /* Repeat NextCopyFrom() until no soft error occurs */
+           goto retry;
+       }
+
        ExecStoreVirtualTuple(slot);
+   }
 
    /* Switch back to original memory context */
    MemoryContextSwitchTo(oldcontext);
@@ -796,8 +835,19 @@ fileEndForeignScan(ForeignScanState *node)
    FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
 
    /* if festate is NULL, we are in EXPLAIN; nothing to do */
-   if (festate)
-       EndCopyFrom(festate->cstate);
+   if (!festate)
+       return;
+
+   if (festate->cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+       festate->cstate->num_errors > 0 &&
+       festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
+       ereport(NOTICE,
+               errmsg_plural("%llu row was skipped due to data type incompatibility",
+                             "%llu rows were skipped due to data type incompatibility",
+                             (unsigned long long) festate->cstate->num_errors,
+                             (unsigned long long) festate->cstate->num_errors));
+
+   EndCopyFrom(festate->cstate);
 }
 
 /*
@@ -1113,7 +1163,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
  * which must have at least targrows entries.
  * The actual number of rows selected is returned as the function result.
  * We also count the total number of rows in the file and return it into
- * *totalrows.  Note that *totaldeadrows is always set to 0.
+ * *totalrows.  Rows skipped due to on_error = 'ignore' are not included
+ * in this count.  Note that *totaldeadrows is always set to 0.
  *
  * Note that the returned list of rows is not always in order by physical
  * position in the file.  Therefore, correlation estimates derived later
@@ -1191,6 +1242,21 @@ file_acquire_sample_rows(Relation onerel, int elevel,
        if (!found)
            break;
 
+       if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+           cstate->escontext->error_occurred)
+       {
+           /*
+            * Soft error occurred, skip this tuple and just make
+            * ErrorSaveContext ready for the next NextCopyFrom. Since we
+            * don't set details_wanted and error_data is not to be filled,
+            * just resetting error_occurred is enough.
+            */
+           cstate->escontext->error_occurred = false;
+
+           /* Repeat NextCopyFrom() until no soft error occurs */
+           continue;
+       }
+
        /*
         * The first targrows sample rows are simply copied into the
         * reservoir.  Then we start replacing tuples in the sample until we
@@ -1236,6 +1302,15 @@ file_acquire_sample_rows(Relation onerel, int elevel,
    /* Clean up. */
    MemoryContextDelete(tupcontext);
 
+   if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+       cstate->num_errors > 0 &&
+       cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
+       ereport(NOTICE,
+               errmsg_plural("%llu row was skipped due to data type incompatibility",
+                             "%llu rows were skipped due to data type incompatibility",
+                             (unsigned long long) cstate->num_errors,
+                             (unsigned long long) cstate->num_errors));
+
    EndCopyFrom(cstate);
 
    pfree(values);
index f0548e14e18452789cd83b4fa8f6d88a6b9284a9..edd77c5cd208e7be04052509b08ab46f3f25c287 100644 (file)
@@ -150,6 +150,13 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 -- error context report tests
 SELECT * FROM agg_bad;               -- ERROR
 
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
+SELECT * FROM agg_bad;
+ANALYZE agg_bad;
+
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
index f2f2af9a5962fda8f35da47f3f017339f322a541..bb3579b0777781c509f1eba5b20dbff0ed047ef6 100644 (file)
    </listitem>
   </varlistentry>
 
+  <varlistentry>
+   <term><literal>on_error</literal></term>
+
+   <listitem>
+    <para>
+     Specifies how to behave when encountering an error converting a column's
+     input value into its data type,
+     the same as <command>COPY</command>'s <literal>ON_ERROR</literal> option.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry>
+   <term><literal>log_verbosity</literal></term>
+
+   <listitem>
+    <para>
+     Specifies the amount of messages emitted by <literal>file_fdw</literal>,
+     the same as <command>COPY</command>'s <literal>LOG_VERBOSITY</literal> option.
+    </para>
+   </listitem>
+  </varlistentry>
+
  </variablelist>
 
  <para>