Transform your Domain into a process-centric domain
It enables you to send actions to your domain, put rules over these action’s transitions,
and separate process rules classes from your aggregate class and invariants.
🎯Two main purposes (2 points of vue):
💻Available in
- reactive functional programming (spring reactor) mode
- normal mode (imperative programming)
Prerequisite: JAVA 17 or higher
📚 Table of Contents generated with DocToc
Beyond basic business logic, domains often incorporate state-based rules.
When these state-dependent rules come into the picture, the domain’s overall structure
becomes more complex over time, making both evolutionary changes and maintenance more difficult
Managing the state complexity often leads us to use state machines or business process management (BPM) tools.
While valuable, these tools operates as distinct units from the domain, leading to
When using flow-driven-domain, your domain becomes more process-centric:
Please read the hello world as tutorial. Then try a complete POC application here
Let’s consider a HELLO WORLD example, where your domain is Greeting Aggregate with a simple Value Object message as String
public class Greeting {
private UUIID id;
private String message;
public void updateMessage(String message) {
this.message = message;
}
public void reset() {
this.message = "";
}
public String getMessage() {
return message;
}
}
Now lets imagine we want to add some process/flow over our Greeting domain to be able to produce “Hello World” greeting message using 2 steps/actions
And imagine also we ave these rules:
1- WORLD action cant be executed without a previous HELLO action
2- after HELLO action, if WORLD action is not executed during 30 seconds, the Greeting Domain will reset
3- when WORLD is executed, the Greeting cant be changed anymore (reach a final State)
Lets explore how we can use the framework to reach our goal.
Add the library dependency, here is a gradle example:
repositories {
mavenCentral()
}
dependencies {
implementation "io.github.progmodek:flow-reactive:1.0.1" // "io.github.progmodek:flow:1.0.1" for non-reactive
}
Define ACTIONS, STATES and a process/flow JSON file based on these ENUMs.
Define a java enum containing the actions your process supports
public enum GreetingAction implements FlowAction {
HELLO(USER),
WORLD(USER),
TIMEOUT(SYSTEM);
private final ActionType type;
}
USER vs SYSTEM indicates if its a user action or system action (TIMEOUT is a system action invoked automatically by the system).
Define a java enum containing all the states of your process
public enum GreetingState implements FlowState {
INITIAL,
PENDING_COMPLETION,
EXPIRED,
COMPLETED
}
INITIAL : initial state when we create our Greeting domain
PENDING_COMPLETION : intermediate state(after a HELLO action)
EXPIRED, COMPLETED: final states where no more actions are acceptable
This enum lets you define different flow type (ex DEFAULT), each flow type based on the action and state enum.
public enum GreetinglowType implements FlowType {
DEFAULT("default-flow.json", GreetingAction.class, GreetingState.class);
@Getter
private final String template;
@Getter
private final Class<? extends FlowAction> flowActionType;
@Getter
private final Class<? extends FlowState> flowStateType;
}
This means that DEFAULT flowType is based on GreetingAction and GreetingState enums, and presented by default-flow.json file.
you can have for ex another type EXPRESS with another Json file defining another flow.
{
"actions": [
{
"name": "HELLO",
"delegate": "helloDelegate"
},
{
"name": "WORLD",
"delegate": "worldDelegate"
},
{
"name": "TIMEOUT",
"delegate": "timeoutDelegate",
"expiration": true
}
],
"states": [
{
"name": "INITIAL",
"initial": true,
"transitions": [
{
"action": "HELLO",
"internal": "PENDING_COMPLETION"
}
]
},
{
"name": "PENDING_COMPLETION",
"transitions": [
{
"action": "WORLD",
"result": {
"success": "COMPLETED"
}
},
{
"action": "TIMEOUT",
"result": {
"success": "EXPIRED"
},
"timer": {
"sec": 30
}
}
]
},
{
"name": "EXPIRED",
"transitions": []
},
{
"name": "COMPLETED",
"transitions": []
}
]
}
Explanations:
–We list the ACTIONS with their associated delegates (these delegates will be invoked when the action is invoked)
–TIMEOUT action have also “expiration=true”, this tells that this action is used for expiration and the system will be able to compute “expiresAt” automatically based on the state you currently are.
–INITIAL state has “initial=true”, means when we create our Greeting Aggregate it will be in an INITIAL state.
–INITIAL state have only 1 transition with the action HELLO and transit to PENDING_COMPLETION state.
–PENDING_COMPLETION has 2 transitions, one with action WORLD that transit to COMPLETED state, and one with TIMEOUT (this one is automatic with a timer of 30sec), means that while in PENDING_COMPLETION state, if no WORLD action is called, it will automatically transit to EXPIRED after 30 sec
Let your Domain Aggregate Implement the Flowable Interface
public interface Flowable<ID> {
ID getId();
String getState();
void setState(String state);
Flow getFlow();
void setFlow(Flow flow);
}
The Flowable interface requires the implementation of getters and setters for ID, State, and Flow properties, which are essential for a process-centric model.
So the greeting becomes:
@Data
public class Greeting implements Flowable<UUID> {
// your aggregate ID
private UUID id;
private String message;
// -----------------------------------------------------
// flow elements to add to your domain
// remember that your domain becomes process-centric,
// means it holds the flow history
private String state;
private Flow flow;
// -----------------------------------------------------
public void updateMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
NB: you have to persist these property in your repository
if you are using NOSQL it will be straight forward,
if not you must persist Flow property as json or jsonb
Define an ActionDelegate for each Action.
these delegate encapsulate the necessary business logic for the corresponding action.
@Component
public class HelloDelegate implements ActionDelegate<Greeting, DelegateParams, Greeting> {
@Override
public Mono<UserStory> execute(final Greeting greeting, final Map<String, Object> variables, final DelegateParams delegateParams) {
// implement process related logic, ex call a API, etc..
// and invoke(or not) a behavior on our aggregate
greeting.updateMessage("Hello");
return Mono.just(greeting);
}
}
@Component
public class WorldDelegate implements ActionDelegate<Greeting, DelegateParams, Greeting> {
@Override
public Greeting execute(final Greeting greeting, final Map<String, Object> variables, final DelegateParams delegateParams) {
// implement process related logic, ex call a API, etc.. (in our case nothing special)
// and invoke(or not) a behavior on our aggregate (in our case we update the message)
greeting.updateMessage("Hello World");
return greeting;
}
}
@Component
public class TimeoutDelegate extends SystemActionDelegate<Greeting> {
@Override
public Greeting execute(final Greeting greeting, final Map<String, Object> variables) {
// implement process related logic, ex call a API, etc.. (in our case nothing special)
// and invoke(or not) a behavior on our aggregate (in our case we reset the aggregate after a timeout)
greeting.reset();
return greeting;
}
}
Each delegate must implement the generic ActionDelegate<T extends Flowable, I, R>
-T type is your aggregate
-I type is the input that you can pass to the delegate (a request object containing params)
-R type is the return type (in our ex its the aggregate itself, but you can return a response from calling an external api for ex)
Note: for the system action **TIMEOUT” we extends **SystemActionDelegate
** which is a special implementation of ActionDelegate without input params and the aggregate as return value
Let your DomainRepository extends this FlowRepository interface
public interface FlowRepository<T extends Flowable, ID> {
Optional<T> findById(ID flowId);
T save(T flowable);
}
This interface contains only the already used method in Spring Repositories so it should be straight froward.
Means if you are using Spring Data or Spring JPA these methods are already handled
ex for reactive mode :
public interface DomainRepository extends ReactiveCrudRepository<Greeting, UUID>, FlowRepository<Greeting, UUID> {
}
or JPA mode
public interface DomainRepository extends JpaRepository<Greeting, UUID>, FlowRepository<Greeting, UUID> {
}
the framework includes a Base FlowRepository implementation in case you are using postgres jsonb so you can use it to define your domain repository as follow:
@Bean public FlowRepository<Greeting, UUID> greetingPostGresRepository() { return new BaseR2dbcPostgresJsonRepository<>(Greeting.class) .setTableInfo(table_name, jsonb_column); }
the Flow property should always be in JSON format Best Practice: use JSONB if you are using postgres
Following are some example using postgres JSONB
If you are using JSONB for all your aggregate, your table would be
CREATE TABLE IF NOT EXISTS greeting (
id uuid NOT NULL,
greeting_data jsonb NOT NULL,
CONSTRAINT flow_pk PRIMARY KEY (id)
);
If you are using JSONB only for the Flow property of your aggregate, your table would be
CREATE TABLE IF NOT EXISTS greeting (
id uuid NOT NULL,
message varchar NOT NULL,
flow_data jsonb NOT NULL,
CONSTRAINT flow_pk PRIMARY KEY (id)
);
you can off course use a DB other than postgresql
create a table where scheduled tasks will be saved (flow capabilities like timeouts or other scheduled automatic tasks) ex for postgres
CREATE TABLE IF NOT EXISTS flow_task (
id varchar NOT NULL,
score int8 NOT NULL,
status VARCHAR(255) NOT NULL,
ver INTEGER NOT NULL,
PRIMARY KEY (id)
);
Replace the types by your specific database types (ex VARCHAR(255), BIGINT, INTEGER)
The FlowEngine is pivotal in managing the lifecycle of flowable domain objects, interfacing with the repository to maintain state consistency and handle transitions.
In a DDD point of vue it is the application layer (check framework design).
Instantiate a FlowEngine with the:
-ID class type of your Aggregate
-Aggragte class type
-Repository of your domain
@Bean
FlowEngine<Greeting, UUID> flowEngine(DomainRepository repo) {
return new FlowEngine<>(UUID.class, Greeting.class, repo);
}
Make your domain flowable and execute actions on your domain
Greeting greeting = 'create your Aggragte'
flowEngine.makeFlowable(greeting, ProductflowType.DEFAULT, Map.of());
we can pass a MAP that will be saved to the variables property of the Flow if we want for later use
Invoke Actions
flowEngine.applyAction(id, HELLO, inputParams)
and here you go, your domain will be process-centric (check the state and the flow properties)
{ "id": "e30d2da8-f18e-4c43-bd84-bcffb726cb37", "message" : "Hello", "flow": { "flowType": "DEFAULT", "eligibleActions": ["WORLD"], "actions": [ { "name": "HELLO", "type": "USER", "count": 1, "executions": [ { "executedAt": "2023-08-23T15:48:11.091511Z", "result": "success", "previousState": "INITIAL", "nextState": "PENDING_COMPLETE" } ] } ] }, "state": "PENDING_COMPLETE" }
we can see when the HELLO action was executed and how the state arrived to PENDING_COMPLETE and we see too that the Greeting will expire in 30sec with the expiresAt property
Each executed Action is considered as a flow Event.
All flow events are pushed to EventsPublishers, implementing custom event publishers allows you to have an event-driven architecture.
You can define your own Event Publisher:
@Component
@Slf4j
public class KafkaEventPublisher implements EventsPublisher {
@Override
public void publishEvents(final Flowable aggregate) {
log.info("PUBLISH EVENTS TO KAFKA IF YOU WANT");
}
}
The framework includes an ActionLogger as an EventsPublisher which logs all the executedAction.
You can define your own Logger as EventsPublisher and disable the framework one
The framework design follows DDD principles.
BLUE : Framework Components
GREEN : Your components
the Port/Interface part that links the domain to the infra layer is not shown to simplify the diagram
We saw in HelloWorld example some features of the flow.json when defining actions, states and transitions.
Here is a List of most of the features and how to use them
we can use these properties on each action defined in the JSON file
{
"name": "TIMEOUT",
"delegate": "timeoutDelegate",
"expiration": true
}
When defined as “expiration” action, whenever we are in a state including a transition with a TIMEOUT action, an “expiresAt” property will be computed, and users knows explicitly when the process will expire.
we can use these properties on each state defined in the JSON file
Greeting greeting = 'create your Aggragte'
flowEngine.makeFlowable(greeting, ProductflowType.DEFAULT, Map.of());
{
"name": "INITIAL",
"initial": true,
}
we can use these properties on each transition defined in the JSON file
"transitions": [
{
"action": "APPROVE",
"result": {
"success": "APPROVED",
"error": "NOT_APPROVED"
},
"exceptions": {
"001": "PENDING_APPROVAL"
},
"retry": {
"number": 3,
"exceeded": "NOT_APPROVED"
},
"timer": {
"sec": 30
}
}
]
APPROVE action will be launched in 30sec (timer)
if the functional result (fom the actionDelegate) is success then transit to APPROVED
If the functional result (fom the actionDelegate) is error then transit to NOT_APPROVED
if an exception is thrown (technical exception from the delegate) then transit to PENDING_APPROVAL
in case of other *$exceptions** (not handled by the exceptions part) occurred then stay in same state, but after 3 retries transit to NOT_APPROVED
Here is a list of some use cases that can be thought as a flow driven domain:
A complete POC application for Preparation of Orders in a click-and-collect retail environment can be found here
Prerequisite: JAVA 17 or higher
here are some properties to configure in application.yaml
Default values are already configured, use only if you want to override
flow:
actionLogger:
enabled: true // Default Logger component enabled by default
taskConsumer:
scheduleMilli: 1000 // schedule for pooling for async tasks (the system tasks that should be executed automatically)
The foundational concept of the Framework was originally crafted for Decathlon.
It was specifically tailored to address their checkout process needs and challenges.
This concept, a testament to our innovative approach, has been successfully implemented within Decathlon, demonstrating its effectiveness and reliability in a real-world, enterprise environment.
Building on this initial success, the concept was further evolved into a full-fledged framework, guided by principles of Domain-Driven Design (DDD), ensuring a robust and scalable architecture.
If you have any questions, need assistance with the framework, or want to discuss potential projects, please don’t hesitate to reach out.
We value your feedback and are eager to support your projects using this framework.
Let’s innovate and create together!