20
20
* index_insert - insert an index tuple into a relation
21
21
* index_markpos - mark a scan position
22
22
* index_restrpos - restore a scan position
23
+ * index_parallelscan_estimate - estimate shared memory for parallel scan
24
+ * index_parallelscan_initialize - initialize parallel scan
25
+ * index_parallelrescan - (re)start a parallel scan of an index
26
+ * index_beginscan_parallel - join parallel index scan
23
27
* index_getnext_tid - get the next TID from a scan
24
28
* index_fetch_heap - get the scan's next heap tuple
25
29
* index_getnext - get the next heap tuple from a scan
@@ -120,7 +124,8 @@ do { \
120
124
} while(0)
121
125
122
126
static IndexScanDesc index_beginscan_internal (Relation indexRelation ,
123
- int nkeys , int norderbys , Snapshot snapshot );
127
+ int nkeys , int norderbys , Snapshot snapshot ,
128
+ ParallelIndexScanDesc pscan , bool temp_snap );
124
129
125
130
126
131
/* ----------------------------------------------------------------
@@ -219,7 +224,7 @@ index_beginscan(Relation heapRelation,
219
224
{
220
225
IndexScanDesc scan ;
221
226
222
- scan = index_beginscan_internal (indexRelation , nkeys , norderbys , snapshot );
227
+ scan = index_beginscan_internal (indexRelation , nkeys , norderbys , snapshot , NULL , false );
223
228
224
229
/*
225
230
* Save additional parameters into the scandesc. Everything else was set
@@ -244,7 +249,7 @@ index_beginscan_bitmap(Relation indexRelation,
244
249
{
245
250
IndexScanDesc scan ;
246
251
247
- scan = index_beginscan_internal (indexRelation , nkeys , 0 , snapshot );
252
+ scan = index_beginscan_internal (indexRelation , nkeys , 0 , snapshot , NULL , false );
248
253
249
254
/*
250
255
* Save additional parameters into the scandesc. Everything else was set
@@ -260,8 +265,11 @@ index_beginscan_bitmap(Relation indexRelation,
260
265
*/
261
266
static IndexScanDesc
262
267
index_beginscan_internal (Relation indexRelation ,
263
- int nkeys , int norderbys , Snapshot snapshot )
268
+ int nkeys , int norderbys , Snapshot snapshot ,
269
+ ParallelIndexScanDesc pscan , bool temp_snap )
264
270
{
271
+ IndexScanDesc scan ;
272
+
265
273
RELATION_CHECKS ;
266
274
CHECK_REL_PROCEDURE (ambeginscan );
267
275
@@ -276,8 +284,13 @@ index_beginscan_internal(Relation indexRelation,
276
284
/*
277
285
* Tell the AM to open a scan.
278
286
*/
279
- return indexRelation -> rd_amroutine -> ambeginscan (indexRelation , nkeys ,
287
+ scan = indexRelation -> rd_amroutine -> ambeginscan (indexRelation , nkeys ,
280
288
norderbys );
289
+ /* Initialize information for parallel scan. */
290
+ scan -> parallel_scan = pscan ;
291
+ scan -> xs_temp_snap = temp_snap ;
292
+
293
+ return scan ;
281
294
}
282
295
283
296
/* ----------------
@@ -341,6 +354,9 @@ index_endscan(IndexScanDesc scan)
341
354
/* Release index refcount acquired by index_beginscan */
342
355
RelationDecrementReferenceCount (scan -> indexRelation );
343
356
357
+ if (scan -> xs_temp_snap )
358
+ UnregisterSnapshot (scan -> xs_snapshot );
359
+
344
360
/* Release the scan data structure itself */
345
361
IndexScanEnd (scan );
346
362
}
@@ -389,6 +405,115 @@ index_restrpos(IndexScanDesc scan)
389
405
scan -> indexRelation -> rd_amroutine -> amrestrpos (scan );
390
406
}
391
407
408
+ /*
409
+ * index_parallelscan_estimate - estimate shared memory for parallel scan
410
+ *
411
+ * Currently, we don't pass any information to the AM-specific estimator,
412
+ * so it can probably only return a constant. In the future, we might need
413
+ * to pass more information.
414
+ */
415
+ Size
416
+ index_parallelscan_estimate (Relation indexRelation , Snapshot snapshot )
417
+ {
418
+ Size nbytes ;
419
+
420
+ RELATION_CHECKS ;
421
+
422
+ nbytes = offsetof(ParallelIndexScanDescData , ps_snapshot_data );
423
+ nbytes = add_size (nbytes , EstimateSnapshotSpace (snapshot ));
424
+ nbytes = MAXALIGN (nbytes );
425
+
426
+ /*
427
+ * If amestimateparallelscan is not provided, assume there is no
428
+ * AM-specific data needed. (It's hard to believe that could work, but
429
+ * it's easy enough to cater to it here.)
430
+ */
431
+ if (indexRelation -> rd_amroutine -> amestimateparallelscan != NULL )
432
+ nbytes = add_size (nbytes ,
433
+ indexRelation -> rd_amroutine -> amestimateparallelscan ());
434
+
435
+ return nbytes ;
436
+ }
437
+
438
+ /*
439
+ * index_parallelscan_initialize - initialize parallel scan
440
+ *
441
+ * We initialize both the ParallelIndexScanDesc proper and the AM-specific
442
+ * information which follows it.
443
+ *
444
+ * This function calls access method specific initialization routine to
445
+ * initialize am specific information. Call this just once in the leader
446
+ * process; then, individual workers attach via index_beginscan_parallel.
447
+ */
448
+ void
449
+ index_parallelscan_initialize (Relation heapRelation , Relation indexRelation ,
450
+ Snapshot snapshot , ParallelIndexScanDesc target )
451
+ {
452
+ Size offset ;
453
+
454
+ RELATION_CHECKS ;
455
+
456
+ offset = add_size (offsetof(ParallelIndexScanDescData , ps_snapshot_data ),
457
+ EstimateSnapshotSpace (snapshot ));
458
+ offset = MAXALIGN (offset );
459
+
460
+ target -> ps_relid = RelationGetRelid (heapRelation );
461
+ target -> ps_indexid = RelationGetRelid (indexRelation );
462
+ target -> ps_offset = offset ;
463
+ SerializeSnapshot (snapshot , target -> ps_snapshot_data );
464
+
465
+ /* aminitparallelscan is optional; assume no-op if not provided by AM */
466
+ if (indexRelation -> rd_amroutine -> aminitparallelscan != NULL )
467
+ {
468
+ void * amtarget ;
469
+
470
+ amtarget = OffsetToPointer (target , offset );
471
+ indexRelation -> rd_amroutine -> aminitparallelscan (amtarget );
472
+ }
473
+ }
474
+
475
+ /* ----------------
476
+ * index_parallelrescan - (re)start a parallel scan of an index
477
+ * ----------------
478
+ */
479
+ void
480
+ index_parallelrescan (IndexScanDesc scan )
481
+ {
482
+ SCAN_CHECKS ;
483
+
484
+ /* amparallelrescan is optional; assume no-op if not provided by AM */
485
+ if (scan -> indexRelation -> rd_amroutine -> amparallelrescan != NULL )
486
+ scan -> indexRelation -> rd_amroutine -> amparallelrescan (scan );
487
+ }
488
+
489
+ /*
490
+ * index_beginscan_parallel - join parallel index scan
491
+ *
492
+ * Caller must be holding suitable locks on the heap and the index.
493
+ */
494
+ IndexScanDesc
495
+ index_beginscan_parallel (Relation heaprel , Relation indexrel , int nkeys ,
496
+ int norderbys , ParallelIndexScanDesc pscan )
497
+ {
498
+ Snapshot snapshot ;
499
+ IndexScanDesc scan ;
500
+
501
+ Assert (RelationGetRelid (heaprel ) == pscan -> ps_relid );
502
+ snapshot = RestoreSnapshot (pscan -> ps_snapshot_data );
503
+ RegisterSnapshot (snapshot );
504
+ scan = index_beginscan_internal (indexrel , nkeys , norderbys , snapshot ,
505
+ pscan , true);
506
+
507
+ /*
508
+ * Save additional parameters into the scandesc. Everything else was set
509
+ * up by index_beginscan_internal.
510
+ */
511
+ scan -> heapRelation = heaprel ;
512
+ scan -> xs_snapshot = snapshot ;
513
+
514
+ return scan ;
515
+ }
516
+
392
517
/* ----------------
393
518
* index_getnext_tid - get the next TID from a scan
394
519
*
0 commit comments