Content-Length: 915142 | pFad | http://github.com/temporalio/sdk-java/commit/44d9abe97984129cd1185f181feb0fca63b326aa

C5 Set TemporalChangeVersion when workflow version is updated (#2464) · temporalio/sdk-java@44d9abe · GitHub
Skip to content

Commit 44d9abe

Browse files
Set TemporalChangeVersion when workflow version is updated (#2464)
Upsert TemporalChangeVersion when workflow version is updated
1 parent 8613b18 commit 44d9abe

File tree

38 files changed

+3415
-57
lines changed

38 files changed

+3415
-57
lines changed

temporal-sdk/src/main/java/io/temporal/internal/history/VersionMarkerUtils.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
11
package io.temporal.internal.history;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import com.google.common.base.Preconditions;
45
import io.temporal.api.command.v1.Command;
56
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
67
import io.temporal.api.common.v1.Payloads;
8+
import io.temporal.api.common.v1.SearchAttributes;
79
import io.temporal.api.history.v1.HistoryEvent;
810
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
11+
import io.temporal.common.SearchAttributeKey;
912
import io.temporal.common.converter.DefaultDataConverter;
13+
import io.temporal.internal.common.SearchAttributesUtil;
14+
import java.util.ArrayList;
1015
import java.util.HashMap;
16+
import java.util.List;
1117
import java.util.Map;
1218
import javax.annotation.Nullable;
1319

1420
public class VersionMarkerUtils {
1521
public static final String MARKER_NAME = "Version";
1622
public static final String MARKER_CHANGE_ID_KEY = "changeId";
1723
public static final String MARKER_VERSION_KEY = "version";
24+
// Key used to store if an upsert version search attribute was written while writing the marker.
25+
public static final String VERSION_SA_UPDATED_KEY = "versionSearchAttributeUpdated";
26+
27+
// TemporalChangeVersion is used as search attributes key to find workflows with specific change
28+
// version.
29+
@VisibleForTesting
30+
public static final SearchAttributeKey<List<String>> TEMPORAL_CHANGE_VERSION =
31+
SearchAttributeKey.forKeywordList("TemporalChangeVersion");
32+
33+
// Limit the size of the change version search attribute to avoid exceeding the server limits.
34+
// Exceeding the limit
35+
// will result in the search attribute not being added.
36+
public static final int CHANGE_VERSION_SEARCH_ATTRIBUTE_SIZE_LIMIT = 2048;
1837

1938
/**
2039
* @param event {@code HistoryEvent} to parse
@@ -55,17 +74,47 @@ public static Integer getVersion(MarkerRecordedEventAttributes markerAttributes)
5574
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_VERSION_KEY, Integer.class);
5675
}
5776

77+
@Nullable
78+
public static boolean getUpsertVersionSA(MarkerRecordedEventAttributes markerAttributes) {
79+
Boolean versionSearchAttributeUpdated =
80+
MarkerUtils.getValueFromMarker(markerAttributes, VERSION_SA_UPDATED_KEY, Boolean.class);
81+
return versionSearchAttributeUpdated != null && versionSearchAttributeUpdated;
82+
}
83+
5884
public static RecordMarkerCommandAttributes createMarkerAttributes(
59-
String changeId, Integer version) {
85+
String changeId, Integer version, Boolean upsertVersionSA) {
6086
Preconditions.checkNotNull(version, "version");
6187
Map<String, Payloads> details = new HashMap<>();
6288
details.put(
6389
MARKER_CHANGE_ID_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(changeId).get());
6490
details.put(
6591
MARKER_VERSION_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(version).get());
92+
if (upsertVersionSA) {
93+
details.put(
94+
VERSION_SA_UPDATED_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(true).get());
95+
}
6696
return RecordMarkerCommandAttributes.newBuilder()
6797
.setMarkerName(MARKER_NAME)
6898
.putAllDetails(details)
6999
.build();
70100
}
101+
102+
public static String createChangeId(String changeId, Integer version) {
103+
return changeId + "-" + version;
104+
}
105+
106+
public static SearchAttributes createVersionMarkerSearchAttributes(
107+
String newChangeId, Integer newVersion, Map<String, Integer> existingVersions) {
108+
List<String> changeVersions = new ArrayList<>(existingVersions.size() + 1);
109+
existingVersions.entrySet().stream()
110+
.map(entry -> createChangeId(entry.getKey(), entry.getValue()))
111+
.forEach(changeVersions::add);
112+
changeVersions.add(createChangeId(newChangeId, newVersion));
113+
SearchAttributes sa =
114+
SearchAttributesUtil.encodeTyped(
115+
io.temporal.common.SearchAttributes.newBuilder()
116+
.set(TEMPORAL_CHANGE_VERSION, changeVersions)
117+
.build());
118+
return sa;
119+
}
71120
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.temporal.internal.replay;
22

3+
import static io.temporal.internal.history.VersionMarkerUtils.TEMPORAL_CHANGE_VERSION;
4+
35
import com.uber.m3.tally.Scope;
46
import io.temporal.api.command.v1.*;
57
import io.temporal.api.common.v1.*;
@@ -20,12 +22,15 @@
2022
import java.util.*;
2123
import javax.annotation.Nonnull;
2224
import javax.annotation.Nullable;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2327

2428
/**
2529
* TODO callbacks usage is non consistent. It accepts Optional and Exception which can be null.
2630
* Switch both to nullable.
2731
*/
2832
final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {
33+
private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowContextImpl.class);
2934
private final BasicWorkflowContext basicWorkflowContext;
3035
private final WorkflowStateMachines workflowStateMachines;
3136
private final WorkflowMutableState mutableState;
@@ -336,6 +341,18 @@ public long currentTimeMillis() {
336341

337342
@Override
338343
public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
344+
/*
345+
* Temporal Change Version is a reserved field and should ideally not be set by the user.
346+
* It is set by the SDK when getVersion is called. We know that users have been setting
347+
* this field in the past, and we want to avoid breaking their workflows.
348+
* */
349+
if (searchAttributes.containsIndexedFields(TEMPORAL_CHANGE_VERSION.getName())) {
350+
// When we enabled upserting of the search attribute by default, we should consider raising a
351+
// warning here.
352+
log.debug(
353+
"{} is a reserved field. This can be set automatically by the SDK by calling `setEnableUpsertVersionSearchAttributes` on your `WorkflowImplementationOptions`",
354+
TEMPORAL_CHANGE_VERSION.getName());
355+
}
339356
workflowStateMachines.upsertSearchAttributes(searchAttributes);
340357
mutableState.upsertSearchAttributes(searchAttributes);
341358
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,12 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
9494
this.workflow = workflow;
9595

9696
this.workflowStateMachines =
97-
new WorkflowStateMachines(new StatesMachinesCallbackImpl(), capabilities);
97+
new WorkflowStateMachines(
98+
new StatesMachinesCallbackImpl(),
99+
capabilities,
100+
workflow.getWorkflowContext() == null
101+
? WorkflowImplementationOptions.newBuilder().build()
102+
: workflow.getWorkflowContext().getWorkflowImplementationOptions());
98103
String fullReplayDirectQueryType =
99104
workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
100105
this.context =

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import com.google.common.annotations.VisibleForTesting;
77
import com.google.common.base.Preconditions;
88
import com.google.common.base.Strings;
9+
import io.temporal.api.command.v1.Command;
910
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
11+
import io.temporal.api.common.v1.SearchAttributes;
1012
import io.temporal.api.enums.v1.CommandType;
1113
import io.temporal.api.enums.v1.EventType;
1214
import io.temporal.api.history.v1.HistoryEvent;
@@ -27,6 +29,16 @@ final class VersionStateMachine {
2729

2830
@Nullable private Integer version;
2931

32+
/** This flag is used to determine if the search attribute for the version change was written. */
33+
@Nullable private Boolean writeVersionChangeSA = false;
34+
35+
/**
36+
* This flag is used to determine if the search attribute for the version change has been written
37+
* by this state machine. This is used to prevent writing the search attribute multiple times if
38+
* getVersion is called repeatedly.
39+
*/
40+
private boolean hasWrittenVersionChangeSA = false;
41+
3042
/**
3143
* This variable is used for replay only. When we replay, we look one workflow task ahead and
3244
* preload all version markers to be able to return from Workflow.getVersion called in the event
@@ -121,14 +133,18 @@ class InvocationStateMachine
121133

122134
private final int minSupported;
123135
private final int maxSupported;
124-
136+
private final Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback;
125137
private final Functions.Proc2<Integer, RuntimeException> resultCallback;
126138

127139
InvocationStateMachine(
128-
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
140+
int minSupported,
141+
int maxSupported,
142+
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
143+
Functions.Proc2<Integer, RuntimeException> callback) {
129144
super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
130145
this.minSupported = minSupported;
131146
this.maxSupported = maxSupported;
147+
this.upsertSearchAttributeCallback = upsertSearchAttributeCallback;
132148
this.resultCallback = Objects.requireNonNull(callback);
133149
}
134150

@@ -220,9 +236,16 @@ State createMarkerExecuting() {
220236
return State.SKIPPED;
221237
} else {
222238
version = maxSupported;
239+
SearchAttributes sa = upsertSearchAttributeCallback.apply(version);
240+
writeVersionChangeSA = sa != null;
223241
RecordMarkerCommandAttributes markerAttributes =
224-
VersionMarkerUtils.createMarkerAttributes(changeId, version);
225-
addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes));
242+
VersionMarkerUtils.createMarkerAttributes(changeId, version, writeVersionChangeSA);
243+
Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes);
244+
addCommand(markerCommand);
245+
if (writeVersionChangeSA) {
246+
hasWrittenVersionChangeSA = true;
247+
UpsertSearchAttributesStateMachine.newInstance(sa, commandSink, stateMachineSink);
248+
}
226249
return State.MARKER_COMMAND_CREATED;
227250
}
228251
}
@@ -255,6 +278,13 @@ void notifyMarkerCreatedReplaying() {
255278
State createMarkerReplaying() {
256279
createFakeCommand();
257280
if (preloadedVersion != null) {
281+
if (writeVersionChangeSA && !hasWrittenVersionChangeSA) {
282+
hasWrittenVersionChangeSA = true;
283+
if (writeVersionChangeSA) {
284+
UpsertSearchAttributesStateMachine.newInstance(
285+
SearchAttributes.newBuilder().build(), commandSink, stateMachineSink);
286+
}
287+
}
258288
return State.MARKER_COMMAND_CREATED_REPLAYING;
259289
} else {
260290
return State.SKIPPED_REPLAYING;
@@ -311,7 +341,7 @@ private void updateVersionFromEvent(HistoryEvent event) {
311341
version = getVersionFromEvent(event);
312342
}
313343

314-
private void preloadVersionFromEvent(HistoryEvent event) {
344+
private Integer preloadVersionFromEvent(HistoryEvent event) {
315345
if (version != null) {
316346
throw new NonDeterministicException(
317347
"Version is already set to " + version + ". " + RETROACTIVE_ADDITION_ERROR_STRING);
@@ -324,6 +354,7 @@ private void preloadVersionFromEvent(HistoryEvent event) {
324354
preloadedVersion);
325355

326356
preloadedVersion = getVersionFromEvent(event);
357+
return preloadedVersion;
327358
}
328359

329360
void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) {
@@ -360,8 +391,13 @@ private VersionStateMachine(
360391
* @return True if the identifier is not present in history
361392
*/
362393
public Integer getVersion(
363-
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
364-
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
394+
int minSupported,
395+
int maxSupported,
396+
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
397+
Functions.Proc2<Integer, RuntimeException> callback) {
398+
InvocationStateMachine ism =
399+
new InvocationStateMachine(
400+
minSupported, maxSupported, upsertSearchAttributeCallback, callback);
365401
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
366402
ism.explicitEvent(ExplicitEvent.SCHEDULE);
367403
// If the state is SKIPPED_REPLAYING that means we:
@@ -373,12 +409,16 @@ public Integer getVersion(
373409
return version == null ? preloadedVersion : version;
374410
}
375411

412+
public boolean isWriteVersionChangeSA() {
413+
return writeVersionChangeSA;
414+
}
415+
376416
public void handleNonMatchingEvent(HistoryEvent event) {
377417
flushPreloadedVersionAndUpdateFromEvent(event);
378418
}
379419

380-
public void handleMarkersPreload(HistoryEvent event) {
381-
preloadVersionFromEvent(event);
420+
public Integer handleMarkersPreload(HistoryEvent event) {
421+
return preloadVersionFromEvent(event);
382422
}
383423

384424
private int getVersionFromEvent(HistoryEvent event) {
@@ -398,6 +438,13 @@ private int getVersionFromEvent(HistoryEvent event) {
398438
Integer version = VersionMarkerUtils.getVersion(event.getMarkerRecordedEventAttributes());
399439
Preconditions.checkArgument(version != null, "Marker details missing required version key");
400440

441+
writeVersionChangeSA =
442+
VersionMarkerUtils.getUpsertVersionSA(event.getMarkerRecordedEventAttributes());
443+
// Old SDKs didn't write the version change search attribute. So, if it is not present then
444+
// default to false.
445+
if (writeVersionChangeSA == null) {
446+
writeVersionChangeSA = false;
447+
}
401448
return version;
402449
}
403450
}

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/temporalio/sdk-java/commit/44d9abe97984129cd1185f181feb0fca63b326aa

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy