Skip to content

Concurrency

Flou's Network of Agents were designed from the ground up to support concurrency enabling easy representation of complex workflows out-of-the-box with minimum latency.

We have two types of concurrency. When a workflow forks into two or more distinct flows or when a workflow needs to be repeated several times concurrently.

Forking a workflow into different flows

When you need to execute many different tasks at the same time you just need to define a transition that has one outgoing LTM for each task.

Forking workflow example
Forking workflow example

You just need to replace the to parameter with an iterable of LTMs. In this case when launch_tasks is called TaskA, TaskB and TaskC statuses will be changed to queued and the Flou Engine will execute them concurrently.

Launching tasks concurrently example
class ConcurrentMachine(LTM):
    name = 'concurrent_machine'

    init = [InitialState]
    transitions = [
        {
            'label': 'launch_tasks',
            'from': InitialState,
            'to': [TaskA, TaskB, TaskC]
        }
    ]

Running an LTM several times concurrently

In some workflows or LLM patterns you need to run the same task several times with different parameters. For example, you need an LLM to process several uploaded files concurrently or you need to execute the same LLM prompt several times and choose the most common one (self-consistency) to achieve better performance.

Flou makes this easy with parameterized transitions. Just like regular Python f-strings you can label your transitions and LTM names with named parameters and then launch a transition with a set of parameters. Each parameter name has to be surrounded by {name} curly brackets.

Parameterized transition example
Parameterized transition example
Launching parameterized transitions example
class LaunchFilesProcessing(LTM):
    name = "launch_files_processing"

    def run(self, payload=None):
        self.transition(
            "process_file_{file_id}",
            params=[{"file_id": "1111"}, {"file_id": "2222"}],
        )


class ProcessingFile(LTM):
    name = "processing_file_{file_id}"

    def run(self, payload=None):
        # processing code here
        file_id = self.params['file_id']
        ...


class ConcurrentProcessing(LTM):
    name = "concurrent_processing"

    init = [LaunchFilesProcessing]
    transitions = [
        {
            "from": LaunchFilesProcessing,
            "label": "process_file_{file_id}",
            "to": ProcessingFile
        }
    ]

In this example when transitioning process_file_{file_id} with parameters {"file_id": "1111"}, {"file_id": "2222"}] two ProcessingFile will be launched with names: processing_file_1111 and processing_file_2222.

LTMs launched with parameterized transitions must have the same parameters in their names

In this case file_id is the parameter name and it's present in the LTM name processing_file_{file_id} and the transition process_file_{file_id}.

  • You can have as many parameters as needed in a parameterized transition.
  • Use self.params to get the params of the current executing LTM.
  • You can concurrently launch either States or sub State Machines.
  • Each parameterized LTM has it's own local store accessible via self.state.

Joining concurrent flows

If we want to join concurrent states after a fork we need a special State that handles the join. But first we need to understand how Flou handles updating the store.

Concurrently Updating the Store

When using concurrency you need to be very careful about updates to the store. Internally update_state updates the local memory store immediately but waits until the State execution finishes to update the database store atomically with just one call. This makes it difficult to work with concurrent stores when you have several LTMs updating it at the same time.

For this use case you can use LTM.atomic_state_append(key, value) that atomically and immediately appends value to a pre initialized list key in self.state and returns the updated list. This can be used when joining concurrent forks.

Joining forked workflows

Concurrency fork/join example
Concurrency fork/join example

In this case we want JoinTasks to transition done only when TaskA, TaskB and TaskC have executed correctly.

class ConcurrentJoinMachine(LTM):
    name = 'concurrent_join_machine'

    init = [InitialState]
    transitions = [
        {'label': 'launch_tasks', 'from': InitialState, 'to': [TaskA, TaskB, TaskC]},
        {'label': 'finished_task_a', 'from': TaskA, 'to': JoinTasks},
        {'label': 'finished_task_b', 'from': TaskB, 'to': JoinTasks},
        {'label': 'finished_task_c', 'from': TaskC, 'to': JoinTasks},
        { 'label': 'done', 'from': JoinTasks, 'to': Finished},
    ]

    def get_initial_state(self):
        return {'executed_tasks': []}

We now have 3 finished_tasks transitions from the 3 tasks to JoinTasks. We are initializing the store with an empty list executed_tasks that will store the tasks that have been already executed.

We can use the transitions payload parameter to indicate which task has been executed and is transitioning.

class TaskA(LTM):
    name = 'task_a'

    def run(self, payload=None):
        # run task A code
        ...

        self.transition('finished_task_a', payload='A')

Because JoinTasks will be called 3 times and possible at the same time we need to use atomic_state_append that will add an item to executed_tasks and return the new value atomically. This way we can guarantee that in only 1 of the 3 executions executed_tasks will have three items, hence only transitioning done once.

Note that we get which task is calling JoinTasks by looking at the transaction payload.

class JoinTasks(LTM):
    name = 'join_tasks'

    def run(self):
        executed_tasks = self.parent.atomic_state_append('executed_tasks', payload)
        if set(executed_tasks) == set(('A', 'B', 'C')):
            self.transition('done')

Joining parameterized LTMs

For parameterized transitions we need to create a similar Join State but in this case we might not know which or how many parameters where transitioned. We need to save the launched params in the parent's store so we can retrieve them in the Join.

Concurrency parameters join example
Concurrency parameters join example
class LaunchFilesProcessing(LTM):
    name = "launch_files_processing"

    def run(self, payload):
        uploaded_file_ids = payload['uploaded_file_ids']

        self.parent.update_state('launched_params', uploaded_file_ids)

        self.transition(
            "start_{file_id}",
            params=[
                {"file_id": file_id}
                for file_id in uploaded_file_ids
            ],
        )


class JoinProcessing(LTM):
    name = 'join_processing'

    def run(self):
        processed_files = self.parent.atomic_state_append('processed_files', payload)
        if set(processed_files) == set(self.parent.state['launched_params']):
            self.transition('done')