Asynchronous actions with AWS Step Functions
AWS Step functions are the core service we use in our team and has changed the way we look at our service architecture and process orchestration and also has shown us the way how humans and 3rd party APIs can interact with our products.
Several of our internal processes (e.g. client registration) often require an action "from the outside". That could be filling a form and clicking a button in the frontend side of the application, checking and approving a KYC document by a human, or just waiting for a 3rd party API response.
We used the Activities feature, of the AWS Step Functions, in the past; it used activity workers and would poll for tasks that are waiting and would process them subsequently. Implementation of this can get pretty hard to understand and overcomplicated, and for our use cases also un-optimal in event-driven architecture. It changed when AWS announced support for callback patterns in the Step Functions.
Having the possibility to pause execution and wait for external service to send information about success (or failure) back, we can easily create a pattern that handles asynchronous actions but keeps enough flexibility to be adapted to various business requirements, as shown below.
The state machine execution stops at the Wait for external action
step, invokes createAsyncAction
lambda function and will remain in this state until the sendTaskSuccess()
is called.
createAsyncAction
saves the execution information to AWS DynamoDB table:
- execution ID as a unique identifier
- taskToken that is passed from the state machine and serves as a pointer where the execution was paused
This remains intact until the "External service" triggers the continuation of execution. The service invokes the deleteAsyncAction
lambda function passing the execution ID as a parameter. That function then reads the DB using the execution ID to get the relevant taskToken, so sendTaskSuccess()
(or sendTaskFailure()
) can be called, and the state machine can continue. The DB record can be deleted, as the execution is no longer on pause.
To demonstrate this pattern in a real-life example, I will implement a simplified version of the e-commerce order processing system using the Serverless framework and NodeJS. For simplicity sake, the system is triggered by simple API endpoints, and the UI part will not be implemented.
Example
An order is placed, by a customer, by calling a POST /order
API endpoint, which starts a processOrder
state machine execution. The order is created, with status PENDING
, and the execution pauses until the online store staff update its status using the POST /order/update
API call to status PROCESSING
. Once this has been completed, it can be updated again, by calling the same endpoint to the status SENT
. The customer can check the current status, with GET /order/{id}
, at any time.
Demo repository
Let's start by defining the base of our Serverless service.
service: step-functions-async-actions
plugins:
- serverless-step-functions
provider:
name: aws
runtime: nodejs14.x
region: eu-central-1
We define two DynamoDB tables: orders
track the order and its status and orderProcessingActions
keeps the info of paused executions. Both tables use partition key as primary key:
orders
primary key isorderId
. For this example, I am using the state machines unique execution ID. However, it is possible to use any id which uniquely identifies an order (e.g. generated by uuid NPM package)orderProcessingActions
primary key isexecutionId
. For this example, it will also be the state machines unique execution ID. It is important to note that the store staff will need to use this ID to update the status of the order.
We'll need to read, update and delete records from these tables. So, this is also the right time to properly set the permissions. Let's also set tables names as environment variables for later use in the lambda functions.
Now we can proceed with the state machine definition. Let's keep it simple and define an http POST event within the state machine, which will start the execution.
So later we can use POST /order
with JSON body.
The order status will be set and tracked in the state machine. Pass
task type, with name Set PENDING status
, is used here, and the status PENDING
is appended to task input using Parameters
property.
Next, we want to create a record in the orders
table. For this straightforward operation, we can call DynamoDB API directly from the Create order record
step definition.
The execution ID from the context object is used as the orderId
. Also, notice that the ResultPath
property is set to null
. It indicates that we don't want to pass the result of DynamoDB putItem
operation to the step output. With null
, it is ignored and actual step input is returned from the step. Check the ResultPath documentation for detailed information.
Let's also create a simple API endpoint for checking the current status of an order.
getOrder:
handler: src/getOrder.handler
events:
- http:
path: order/{id}
method: GET
request:
parameters:
paths:
id: true
We read from the table using the id
argument.
'use strict'
const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()
module.exports.handler = async (event) => {
const result = await db.get({
TableName: process.env.ORDERS_TABLE_NAME,
Key: {
orderId: event.pathParameters.id,
}
}).promise()
return {
statusCode: 200,
body: JSON.stringify(result.Item)
}
}
It'll be nice to let the customer know that something is happening with their order. Let's notify him using the next step, Notify customer PENDING
.
The resource needed for this step is a notifyCustomer
lambda function, so let's add it to the stack.
In reality, the function would send a notification email or would send a message to AWS SQS (to be processed in a different service). For the sake of simplicity, the message is just logged to the console here.
Now is the right time to pause the process until some store employee updates the status. That happens in the following Wait for status update from PENDING
step.
A createAction
lambda function is invoked, with Payload
object as parameter. waitForTaskToken
part in the Resource
definition indicates that this step pauses the execution and waits until another part of the system calls the sendTaskSuccess
or sendTaskFailure
callback with token that points to this exact step as a parameter. And yes, that token is passed to taskToken
from context object in the Payload
property here. Let's check what createAction
handler does. After defining the function in serverless.yml
we create a record in the OrderProcessingActionsTable
.
'use strict'
const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()
module.exports.handler = async (event) => {
await db.put({
TableName: process.env.ORDER_PROCESSING_ACTIONS_TABLE_NAME,
Item: {
executionId: event.executionId,
status: event.status,
taskToken: event.taskToken
}
}).promise()
}
So what does this record mean? It says that:
- a state machine execution with ID
executionId
- is on hold at step defined with the unique
taskToken
- and the status of the order is
status
(could bePENDING
,PROCESSING
,SENT
)
So, from this point, the online store employee can check the table and see the orders waiting to be processed. They'd like to unpause the process, continue and update the status of the order, so let's define an API endpoint for that called updateOrder
that invokes finishAction
lambda function.
This function checks if a particular execution is awaiting unpausing; if yes, we can call sendTaskSuccess
using the taskToken
from the record, which enables the execution to continue. The output
parameter is set to stringified empty object, as we need not return anything, in this example, to execution.
We can delete the records hereafter.
The permissions have to be updated as well.
The execution continues to the Set PROCESSING status
, which sets the status
property the same way as Set PENDING status
step, followed by Update order record PROCESSING
that updates the order in orders
table using DynamoDB API directly, then Notify customer PROCESSING
and Wait for status update from PROCESSING
doing the same as the previous waiting step - it waits for sendTaskSuccess()
to be called by the finishAction
handler.
So this part does pretty much the same as the "pending status" part of the state machine. The only difference is that the order record is not created here, just updated.
We can also reuse this approach to cover the last order status - SENT
.
This state machine is now ready to cover the complete lifecycle of the order from the PENDING
status to PROCESSING
and finally SENT
.
One last point that needs to be covered is an ERROR status that might appear if something is wrong with the order; it is out of stock, or just cannot be sent to the customer.
Let's update the finishAction
handler so it accepts also "success"
property in the event body.
The permissions have to be updated once again.
Now, if the store employee, for some reason, is not able to process the order, they can set success
to false
in the POST order/update
payload; the handler will invoke sendTaskFailure()
instead of sendTaskSuccess()
.
This failure, of course, has to be handled in the state machine definition.
We add the Catch
field to each "wait" task.
Once sendTaskFailure()
is invoked, the "wait" step throws an error that is caught based on the ErrorEquals
definition, in this case States.TaskFailed
. Read more about other error names in the official documentation.
Catch
also defines the step that the state machine should continue with in case of an error. In this case it's Set ERROR status
followed by the last Notify customer ERROR
. The switch in notifyCustomer
handler has to be updated as well, so the ERROR
status also has its own message.
That's it. This is how the state machine looks like after these last and final changes.
Wrap up
I hope now you have better understanding how other services can dynamically interact with AWS Step Functions. This pattern is flexible enough to cover also more complex requirements.
For example the action record doesn't have to be deleted from the OrderProcessingActionsTable
. A flag can be set there to indicate that action was already handled and together with timestamp and external service identifier you can have a history record of who and when interacted with the state machine.
In more complex service i'd also cover more steps with Catch
error handling, especially those where DynamoDB API is involved.
That's it for now. In case you have any questions, feel free to contact me at Twitter @fekecrad.