flowmetal/doc/call_cc_airflow.md

6.4 KiB

What problem are you trying to solve?

In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap.

Within the confines of a single computer, we have shells (bash, csh, zsh, oil etc.) and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation. This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand.

In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern. Traditional automation tools such as shells struggle with this task because they make it difficult to implement features such as error handling.

Furthermore, in a distributed environment it cannot be assumed that a single machine can remain available to execute automation. Automation authors are thus confronted not just with the need to use "real" programming languages but the need to implement a database backed state machine which can "checkpoint" the job.

Taking a step back, this is an enormous failure of the languages we have available to describe workflow tasks. That users need to write state machines that define state machines that actually perform the desired task shows that the available tools operate at the wrong level.

Airflow for instance succeeds at providing a "workflow" control flow graph abstraction which frees users of the concerns of implementing their own resumable state machines.

Consider this example from the Airflow documentation -

from __future__ import annotations

import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label

with DAG(
  "example_branch_labels",
  schedule="@daily",
  start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  catchup=False,
) as dag:
  ingest = EmptyOperator(task_id="ingest")
  analyse = EmptyOperator(task_id="analyze")
  check = EmptyOperator(task_id="check_integrity")
  describe = EmptyOperator(task_id="describe_integrity")
  error = EmptyOperator(task_id="email_error")
  save = EmptyOperator(task_id="save")
  report = EmptyOperator(task_id="report")

  ingest >> analyse >> check
  check >> Label("No errors") >> save >> report
  check >> Label("Errors found") >> describe >> error >> report

Compared to handwriting out all the nodes in the control flow graph and the requisite checkpointing to the database on state transitions, this is a considerable improvement. But if you stop and look at this control flow graph, it's expressing a simple branching operation for which we have much more conventional notation.

Consider the same workflow using "normal" Python syntax rather than the embedded workflow syntax -

def noop():
   pass

def check(result):
  return True

ingest = noop
analyse = noop
describe = noop
error = noop
save = noop
report = noop

def main():
  ingest()
  result = analyse()
  if check(result):
    save(result)
    report(result)
  else:
    describe()
    error()
    report()

The program a developer authors is already a control flow graph. We have a name for "operators" or state nodes - they're just functions. We have a name and notation for transitions in the state chart - they're just sequential statements.

Anything we need developers to write above that baseline represents friction specific to a workflow task which we should seek to minimize.

Temporal does better than Airflow and reflects this understanding. Using insights from Azure Durable Functions, their SDK leverages the details of Python's async and await operators to hijack program control flow and implement workflow features under the covers.

Consider this example from the Temporal documentation -

@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
  try:
    while True:
      print("Heartbeating cancel activity")
      await asyncio.sleep(0.5)
      activity.heartbeat("some details")
  except asyncio.CancelledError:
    print("Activity cancelled")
    raise


@workflow.defn
class GreetingWorkflow:
  @workflow.run
  async def run(self, input: ComposeArgsInput) -> None:
    activity_handle = workflow.start_activity(
      cancel_activity,
      ComposeArgsInput(input.arg1, input.arg2),
      start_to_close_timeout=timedelta(minutes=5),
      heartbeat_timeout=timedelta(seconds=30),
    )

    await asyncio.sleep(3)
    activity_handle.cancel()

This is really good compared to an equivalent Airflow graph! All the details are "normal" Python, and the SDK fits "natively" into how Python execution occurs. But it's still laden with syntax such as the async function coloring and decorators which serve only to support the workflow SDK.

In comparison were this workflow a "simple" Python script it would only need to be written

# https://pypi.org/project/timeoutcontext/
from timeoutcontext import task_with_timeout, TimeoutError


def cancellable_activity():
  try:
    while True:
      print("Heartbeating cancellable activity")
      sleep(0.5)
  except TimeoutError:
    print("Activity cancelled")
    raise


def main():
  task = task_with_timeout(lambda: cancellable_activity(),
                           timeout=timedelta(minutes=5))
  sleep(3)
  task.cancel()

As with Airflow, the Temporal SDK effectively requires that the programmer learn not just a set of libraries but the Python async features because the implementation of the workflow engine is leaked to the users. The problem isn't just the excessive syntax, it's that as with Airflow user workflows are no longer "normal" programs. There is in effect an entire Temporal interpreter stacked inbetween the Python runtime with which users are familiar and the user's program. It is in effect a new language with none of the notational advantages of being one.

The flipside is that there is an enormous advantage in tooling to be had by leveraging an existing language - or something that looks enough like one - rather than inventing a new notation. This is the cardinal sin of workflow tools like kubeflow and various CI "workflow" formats - they adopt unique YAML or XML based notations which have no tooling support. For instance by being "normal" (ish) Python, the Temporal SDK benefits from access to editor autocompletion, the MyPy typechecker and all manner of other tools.