6
6
import com .google .common .annotations .VisibleForTesting ;
7
7
import com .google .common .base .Preconditions ;
8
8
import com .google .common .base .Strings ;
9
+ import io .temporal .api .command .v1 .Command ;
9
10
import io .temporal .api .command .v1 .RecordMarkerCommandAttributes ;
11
+ import io .temporal .api .common .v1 .SearchAttributes ;
10
12
import io .temporal .api .enums .v1 .CommandType ;
11
13
import io .temporal .api .enums .v1 .EventType ;
12
14
import io .temporal .api .history .v1 .HistoryEvent ;
@@ -27,6 +29,16 @@ final class VersionStateMachine {
27
29
28
30
@ Nullable private Integer version ;
29
31
32
+ /** This flag is used to determine if the search attribute for the version change was written. */
33
+ @ Nullable private Boolean writeVersionChangeSA = false ;
34
+
35
+ /**
36
+ * This flag is used to determine if the search attribute for the version change has been written
37
+ * by this state machine. This is used to prevent writing the search attribute multiple times if
38
+ * getVersion is called repeatedly.
39
+ */
40
+ private boolean hasWrittenVersionChangeSA = false ;
41
+
30
42
/**
31
43
* This variable is used for replay only. When we replay, we look one workflow task ahead and
32
44
* preload all version markers to be able to return from Workflow.getVersion called in the event
@@ -121,14 +133,18 @@ class InvocationStateMachine
121
133
122
134
private final int minSupported ;
123
135
private final int maxSupported ;
124
-
136
+ private final Functions . Func1 < Integer , SearchAttributes > upsertSearchAttributeCallback ;
125
137
private final Functions .Proc2 <Integer , RuntimeException > resultCallback ;
126
138
127
139
InvocationStateMachine (
128
- int minSupported , int maxSupported , Functions .Proc2 <Integer , RuntimeException > callback ) {
140
+ int minSupported ,
141
+ int maxSupported ,
142
+ Functions .Func1 <Integer , SearchAttributes > upsertSearchAttributeCallback ,
143
+ Functions .Proc2 <Integer , RuntimeException > callback ) {
129
144
super (STATE_MACHINE_DEFINITION , VersionStateMachine .this .commandSink , stateMachineSink );
130
145
this .minSupported = minSupported ;
131
146
this .maxSupported = maxSupported ;
147
+ this .upsertSearchAttributeCallback = upsertSearchAttributeCallback ;
132
148
this .resultCallback = Objects .requireNonNull (callback );
133
149
}
134
150
@@ -220,9 +236,16 @@ State createMarkerExecuting() {
220
236
return State .SKIPPED ;
221
237
} else {
222
238
version = maxSupported ;
239
+ SearchAttributes sa = upsertSearchAttributeCallback .apply (version );
240
+ writeVersionChangeSA = sa != null ;
223
241
RecordMarkerCommandAttributes markerAttributes =
224
- VersionMarkerUtils .createMarkerAttributes (changeId , version );
225
- addCommand (StateMachineCommandUtils .createRecordMarker (markerAttributes ));
242
+ VersionMarkerUtils .createMarkerAttributes (changeId , version , writeVersionChangeSA );
243
+ Command markerCommand = StateMachineCommandUtils .createRecordMarker (markerAttributes );
244
+ addCommand (markerCommand );
245
+ if (writeVersionChangeSA ) {
246
+ hasWrittenVersionChangeSA = true ;
247
+ UpsertSearchAttributesStateMachine .newInstance (sa , commandSink , stateMachineSink );
248
+ }
226
249
return State .MARKER_COMMAND_CREATED ;
227
250
}
228
251
}
@@ -255,6 +278,13 @@ void notifyMarkerCreatedReplaying() {
255
278
State createMarkerReplaying () {
256
279
createFakeCommand ();
257
280
if (preloadedVersion != null ) {
281
+ if (writeVersionChangeSA && !hasWrittenVersionChangeSA ) {
282
+ hasWrittenVersionChangeSA = true ;
283
+ if (writeVersionChangeSA ) {
284
+ UpsertSearchAttributesStateMachine .newInstance (
285
+ SearchAttributes .newBuilder ().build (), commandSink , stateMachineSink );
286
+ }
287
+ }
258
288
return State .MARKER_COMMAND_CREATED_REPLAYING ;
259
289
} else {
260
290
return State .SKIPPED_REPLAYING ;
@@ -311,7 +341,7 @@ private void updateVersionFromEvent(HistoryEvent event) {
311
341
version = getVersionFromEvent (event );
312
342
}
313
343
314
- private void preloadVersionFromEvent (HistoryEvent event ) {
344
+ private Integer preloadVersionFromEvent (HistoryEvent event ) {
315
345
if (version != null ) {
316
346
throw new NonDeterministicException (
317
347
"Version is already set to " + version + ". " + RETROACTIVE_ADDITION_ERROR_STRING );
@@ -324,6 +354,7 @@ private void preloadVersionFromEvent(HistoryEvent event) {
324
354
preloadedVersion );
325
355
326
356
preloadedVersion = getVersionFromEvent (event );
357
+ return preloadedVersion ;
327
358
}
328
359
329
360
void flushPreloadedVersionAndUpdateFromEvent (HistoryEvent event ) {
@@ -360,8 +391,13 @@ private VersionStateMachine(
360
391
* @return True if the identifier is not present in history
361
392
*/
362
393
public Integer getVersion (
363
- int minSupported , int maxSupported , Functions .Proc2 <Integer , RuntimeException > callback ) {
364
- InvocationStateMachine ism = new InvocationStateMachine (minSupported , maxSupported , callback );
394
+ int minSupported ,
395
+ int maxSupported ,
396
+ Functions .Func1 <Integer , SearchAttributes > upsertSearchAttributeCallback ,
397
+ Functions .Proc2 <Integer , RuntimeException > callback ) {
398
+ InvocationStateMachine ism =
399
+ new InvocationStateMachine (
400
+ minSupported , maxSupported , upsertSearchAttributeCallback , callback );
365
401
ism .explicitEvent (ExplicitEvent .CHECK_EXECUTION_STATE );
366
402
ism .explicitEvent (ExplicitEvent .SCHEDULE );
367
403
// If the state is SKIPPED_REPLAYING that means we:
@@ -373,12 +409,16 @@ public Integer getVersion(
373
409
return version == null ? preloadedVersion : version ;
374
410
}
375
411
412
+ public boolean isWriteVersionChangeSA () {
413
+ return writeVersionChangeSA ;
414
+ }
415
+
376
416
public void handleNonMatchingEvent (HistoryEvent event ) {
377
417
flushPreloadedVersionAndUpdateFromEvent (event );
378
418
}
379
419
380
- public void handleMarkersPreload (HistoryEvent event ) {
381
- preloadVersionFromEvent (event );
420
+ public Integer handleMarkersPreload (HistoryEvent event ) {
421
+ return preloadVersionFromEvent (event );
382
422
}
383
423
384
424
private int getVersionFromEvent (HistoryEvent event ) {
@@ -398,6 +438,13 @@ private int getVersionFromEvent(HistoryEvent event) {
398
438
Integer version = VersionMarkerUtils .getVersion (event .getMarkerRecordedEventAttributes ());
399
439
Preconditions .checkArgument (version != null , "Marker details missing required version key" );
400
440
441
+ writeVersionChangeSA =
442
+ VersionMarkerUtils .getUpsertVersionSA (event .getMarkerRecordedEventAttributes ());
443
+ // Old SDKs didn't write the version change search attribute. So, if it is not present then
444
+ // default to false.
445
+ if (writeVersionChangeSA == null ) {
446
+ writeVersionChangeSA = false ;
447
+ }
401
448
return version ;
402
449
}
403
450
}
0 commit comments