Content-Length: 645903 | pFad | http://github.com/temporalio/sdk-java/pull/2464/files

01 Set TemporalChangeVersion when workflow version is updated by Quinn-With-Two-Ns · Pull Request #2464 · temporalio/sdk-java · GitHub
Skip to content

Set TemporalChangeVersion when workflow version is updated #2464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
package io.temporal.internal.history;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
import io.temporal.common.SearchAttributeKey;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.internal.common.SearchAttributesUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

public class VersionMarkerUtils {
public static final String MARKER_NAME = "Version";
public static final String MARKER_CHANGE_ID_KEY = "changeId";
public static final String MARKER_VERSION_KEY = "version";
// Key used to store if an upsert version search attribute was written while writing the marker.
public static final String VERSION_SA_UPDATED_KEY = "versionSearchAttributeUpdated";

// TemporalChangeVersion is used as search attributes key to find workflows with specific change
// version.
@VisibleForTesting
public static final SearchAttributeKey<List<String>> TEMPORAL_CHANGE_VERSION =
SearchAttributeKey.forKeywordList("TemporalChangeVersion");

// Limit the size of the change version search attribute to avoid exceeding the server limits.
// Exceeding the limit
// will result in the search attribute not being added.
public static final int CHANGE_VERSION_SEARCH_ATTRIBUTE_SIZE_LIMIT = 2048;

/**
* @param event {@code HistoryEvent} to parse
Expand Down Expand Up @@ -55,17 +74,47 @@ public static Integer getVersion(MarkerRecordedEventAttributes markerAttributes)
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_VERSION_KEY, Integer.class);
}

@Nullable
public static boolean getUpsertVersionSA(MarkerRecordedEventAttributes markerAttributes) {
Boolean versionSearchAttributeUpdated =
MarkerUtils.getValueFromMarker(markerAttributes, VERSION_SA_UPDATED_KEY, Boolean.class);
return versionSearchAttributeUpdated != null && versionSearchAttributeUpdated;
}

public static RecordMarkerCommandAttributes createMarkerAttributes(
String changeId, Integer version) {
String changeId, Integer version, Boolean upsertVersionSA) {
Preconditions.checkNotNull(version, "version");
Map<String, Payloads> details = new HashMap<>();
details.put(
MARKER_CHANGE_ID_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(changeId).get());
details.put(
MARKER_VERSION_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(version).get());
if (upsertVersionSA) {
details.put(
VERSION_SA_UPDATED_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(true).get());
}
return RecordMarkerCommandAttributes.newBuilder()
.setMarkerName(MARKER_NAME)
.putAllDetails(details)
.build();
}

public static String createChangeId(String changeId, Integer version) {
return changeId + "-" + version;
}

public static SearchAttributes createVersionMarkerSearchAttributes(
String newChangeId, Integer newVersion, Map<String, Integer> existingVersions) {
List<String> changeVersions = new ArrayList<>(existingVersions.size() + 1);
existingVersions.entrySet().stream()
.map(entry -> createChangeId(entry.getKey(), entry.getValue()))
.forEach(changeVersions::add);
changeVersions.add(createChangeId(newChangeId, newVersion));
SearchAttributes sa =
SearchAttributesUtil.encodeTyped(
io.temporal.common.SearchAttributes.newBuilder()
.set(TEMPORAL_CHANGE_VERSION, changeVersions)
.build());
return sa;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.temporal.internal.replay;

import static io.temporal.internal.history.VersionMarkerUtils.TEMPORAL_CHANGE_VERSION;

import com.uber.m3.tally.Scope;
import io.temporal.api.command.v1.*;
import io.temporal.api.common.v1.*;
Expand All @@ -20,12 +22,15 @@
import java.util.*;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TODO callbacks usage is non consistent. It accepts Optional and Exception which can be null.
* Switch both to nullable.
*/
final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {
private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowContextImpl.class);
private final BasicWorkflowContext basicWorkflowContext;
private final WorkflowStateMachines workflowStateMachines;
private final WorkflowMutableState mutableState;
Expand Down Expand Up @@ -336,6 +341,18 @@ public long currentTimeMillis() {

@Override
public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
/*
* Temporal Change Version is a reserved field and should ideally not be set by the user.
* It is set by the SDK when getVersion is called. We know that users have been setting
* this field in the past, and we want to avoid breaking their workflows.
* */
if (searchAttributes.containsIndexedFields(TEMPORAL_CHANGE_VERSION.getName())) {
// When we enabled upserting of the search attribute by default, we should consider raising a
// warning here.
log.debug(
"{} is a reserved field. This can be set automatically by the SDK by calling `setEnableUpsertVersionSearchAttributes` on your `WorkflowImplementationOptions`",
TEMPORAL_CHANGE_VERSION.getName());
}
workflowStateMachines.upsertSearchAttributes(searchAttributes);
mutableState.upsertSearchAttributes(searchAttributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
this.workflow = workflow;

this.workflowStateMachines =
new WorkflowStateMachines(new StatesMachinesCallbackImpl(), capabilities);
new WorkflowStateMachines(
new StatesMachinesCallbackImpl(),
capabilities,
workflow.getWorkflowContext() == null
? WorkflowImplementationOptions.newBuilder().build()
: workflow.getWorkflowContext().getWorkflowImplementationOptions());
String fullReplayDirectQueryType =
workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
this.context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
Expand All @@ -27,6 +29,16 @@ final class VersionStateMachine {

@Nullable private Integer version;

/** This flag is used to determine if the search attribute for the version change was written. */
@Nullable private Boolean writeVersionChangeSA = false;

/**
* This flag is used to determine if the search attribute for the version change has been written
* by this state machine. This is used to prevent writing the search attribute multiple times if
* getVersion is called repeatedly.
*/
private boolean hasWrittenVersionChangeSA = false;

/**
* This variable is used for replay only. When we replay, we look one workflow task ahead and
* preload all version markers to be able to return from Workflow.getVersion called in the event
Expand Down Expand Up @@ -121,14 +133,18 @@ class InvocationStateMachine

private final int minSupported;
private final int maxSupported;

private final Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback;
private final Functions.Proc2<Integer, RuntimeException> resultCallback;

InvocationStateMachine(
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
int minSupported,
int maxSupported,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
this.minSupported = minSupported;
this.maxSupported = maxSupported;
this.upsertSearchAttributeCallback = upsertSearchAttributeCallback;
this.resultCallback = Objects.requireNonNull(callback);
}

Expand Down Expand Up @@ -220,9 +236,16 @@ State createMarkerExecuting() {
return State.SKIPPED;
} else {
version = maxSupported;
SearchAttributes sa = upsertSearchAttributeCallback.apply(version);
writeVersionChangeSA = sa != null;
RecordMarkerCommandAttributes markerAttributes =
VersionMarkerUtils.createMarkerAttributes(changeId, version);
addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes));
VersionMarkerUtils.createMarkerAttributes(changeId, version, writeVersionChangeSA);
Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes);
addCommand(markerCommand);
if (writeVersionChangeSA) {
hasWrittenVersionChangeSA = true;
UpsertSearchAttributesStateMachine.newInstance(sa, commandSink, stateMachineSink);
}
return State.MARKER_COMMAND_CREATED;
}
}
Expand Down Expand Up @@ -255,6 +278,13 @@ void notifyMarkerCreatedReplaying() {
State createMarkerReplaying() {
createFakeCommand();
if (preloadedVersion != null) {
if (writeVersionChangeSA && !hasWrittenVersionChangeSA) {
hasWrittenVersionChangeSA = true;
if (writeVersionChangeSA) {
UpsertSearchAttributesStateMachine.newInstance(
SearchAttributes.newBuilder().build(), commandSink, stateMachineSink);
}
}
return State.MARKER_COMMAND_CREATED_REPLAYING;
} else {
return State.SKIPPED_REPLAYING;
Expand Down Expand Up @@ -311,7 +341,7 @@ private void updateVersionFromEvent(HistoryEvent event) {
version = getVersionFromEvent(event);
}

private void preloadVersionFromEvent(HistoryEvent event) {
private Integer preloadVersionFromEvent(HistoryEvent event) {
if (version != null) {
throw new NonDeterministicException(
"Version is already set to " + version + ". " + RETROACTIVE_ADDITION_ERROR_STRING);
Expand All @@ -324,6 +354,7 @@ private void preloadVersionFromEvent(HistoryEvent event) {
preloadedVersion);

preloadedVersion = getVersionFromEvent(event);
return preloadedVersion;
}

void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) {
Expand Down Expand Up @@ -360,8 +391,13 @@ private VersionStateMachine(
* @return True if the identifier is not present in history
*/
public Integer getVersion(
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
int minSupported,
int maxSupported,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism =
new InvocationStateMachine(
minSupported, maxSupported, upsertSearchAttributeCallback, callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
ism.explicitEvent(ExplicitEvent.SCHEDULE);
// If the state is SKIPPED_REPLAYING that means we:
Expand All @@ -373,12 +409,16 @@ public Integer getVersion(
return version == null ? preloadedVersion : version;
}

public boolean isWriteVersionChangeSA() {
return writeVersionChangeSA;
}

public void handleNonMatchingEvent(HistoryEvent event) {
flushPreloadedVersionAndUpdateFromEvent(event);
}

public void handleMarkersPreload(HistoryEvent event) {
preloadVersionFromEvent(event);
public Integer handleMarkersPreload(HistoryEvent event) {
return preloadVersionFromEvent(event);
}

private int getVersionFromEvent(HistoryEvent event) {
Expand All @@ -398,6 +438,13 @@ private int getVersionFromEvent(HistoryEvent event) {
Integer version = VersionMarkerUtils.getVersion(event.getMarkerRecordedEventAttributes());
Preconditions.checkArgument(version != null, "Marker details missing required version key");

writeVersionChangeSA =
VersionMarkerUtils.getUpsertVersionSA(event.getMarkerRecordedEventAttributes());
// Old SDKs didn't write the version change search attribute. So, if it is not present then
// default to false.
if (writeVersionChangeSA == null) {
writeVersionChangeSA = false;
}
return version;
}
}
Loading
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/temporalio/sdk-java/pull/2464/files

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy