Are you tired of relying on Directed Acyclic Graphs (DAGs) for your workflow orchestration? Consider transitioning from Apache Airflow to Temporal. With a focus on scalability and fault tolerance, Temporal is the perfect solution for complex workflows. Migrating from Airflow to Temporal can improve error handling and even solve looming tech debt. In this post, we’ll go through a basic example in Apache Airflow and how to structure the same example in Temporal. Note: This blog post assumes you have a Temporal server running, but if you aren’t sure how to start one, see the Temporal documentation here.
Examples in this post are written in Python, as that’s the language supported by Apache Airflow.
Apache Airflow Project
Let’s go through the pieces of your DAG within Apache Airflow. Here’s the current DAG definition:
<import random from datetime import datetime, timedelta from airflow.decorators import dag, task from airflow.providers.postgres.hooks.postgres import PostgresHook default_args = { 'owner': 'EK', 'retries': 5, 'retry_delay': timedelta(minutes=5) } @dag(dag_id='our_sample_dag', default_args=default_args, start_date=datetime(2023, 21, 1), schedule_interval=None ) def long_process_method(results): for person_data in results: num_additional_hobbies = random.randint(1, 3) additional_hobbies = [f"Extra Hobby {i}" for i in range(1, num_additional_hobbies)] person_data["hobbies"].extend(additional_hobbies) hobby_skill_levels = {hobby: random.randint(1, 10) for hobby in person_data["hobbies"]} person_data["hobby_skill_levels"] = hobby_skill_levels person_data["has_pet"] = random.choice([True, False]) return results def our_sample_dag(): @task() def long_running_function(): return [ {"name": "Alice", "age": 25, "address": "123 Main St", "hobbies": ["Reading", "Painting"]}, {"name": "Bob", "age": 30, "address": "456 Oak St", "hobbies": ["Cooking", "Gardening"]}, {"name": "Charlie", "age": 22, "address": "789 Pine St", "hobbies": ["Swimming", "Coding"]}, {"name": "David", "age": 28, "address": "101 Elm St", "hobbies": ["Playing Guitar", "Photography"]}, {"name": "Eve", "age": 35, "address": "202 Birch St", "hobbies": ["Traveling", "Hiking"]} ] @task() def perform_second_long_task(results): massaged_data = long_process_method(results) return massaged_data sql_data = long_running_function() perform_second_long_task(sql_data) complicated_dag = our_sample_dag()>
The long_running_function may be your project’s method to perform some SQL query and then return the results. There is also the long_process_method, which in your project might be how you massage the data back to your client. This is a simple use-case, but let’s focus in on what would happen if the DAG fails when attempting to run long_process_method. Once this DAG fails, it will attempt to re-run the workflow, which would re-fetch this data by performing another SQL query. If there are hundreds (if not thousands) of operations being done on your database, errors that cause more refetches like these can bloat up other workflows and processes. In Temporal, the re-run or “retry” would continue after the SQL query was completed, so the query wouldn’t be recalled.
You may have other reasons for switching from Apache Airflow to Temporal, but let’s go into how you can port over your example code into Temporal.
Temporal Project
Your two methods, long_running_function and long_process_method, can be considered as activities in Temporal. An activity is a normal function or method that (ideally) performs a single well-defined action that can be non-deterministic. Let’s create an activities.py file and write the following code:
<import random from temporalio import activity @activity.defn def long_running_function(): # let this be your complicated SQL statement here that returns the data return [ {"name": "Alice", "age": 25, "address": "123 Main St", "hobbies": ["Reading", "Painting"]}, {"name": "Bob", "age": 30, "address": "456 Oak St", "hobbies": ["Cooking", "Gardening"]}, {"name": "Charlie", "age": 22, "address": "789 Pine St", "hobbies": ["Swimming", "Coding"]}, {"name": "David", "age": 28, "address": "101 Elm St", "hobbies": ["Playing Guitar", "Photography"]}, {"name": "Eve", "age": 35, "address": "202 Birch St", "hobbies": ["Traveling", "Hiking"]} ] @activity.defn def long_process_method(results): for person_data in results: # Add a random number of additional hobbies num_additional_hobbies = random.randint(1, 3) additional_hobbies = [f"Extra Hobby {i}" for i in range(1, num_additional_hobbies)] person_data["hobbies"].extend(additional_hobbies) # Add a random skill level to each hobby hobby_skill_levels = {hobby: random.randint(1, 10) for hobby in person_data["hobbies"]} person_data["hobby_skill_levels"] = hobby_skill_levels # Add a random boolean indicating if the person has a pet person_data["has_pet"] = random.choice([True, False]) return results
Note: The @activity.defn decorator must be used on all activities you define.
Creating Your Temporal Workflow
In Temporal, a Workflow is a function that contains certain deterministic traits – which essentially have the same commands executed in the same sequence. This is so Temporal can leverage its Replay capabilities when an activity or workflow fails. The best part? When an activity fails, the state of the workflow is preserved so only that single activity will be run again, rather than having to re-run the entire workflow!
Your workflow method will follow a format similar to the our_sample_dag method defined before. Define this function in a file called workflows.py:
<from datetime import timedelta from temporalio import workflow with workflow.unsafe.imports_passed_through(): from .activities import long_running_function, long_process_method @workflow.defn class OurSampleFunc: @workflow.run async def run (self): data = await workflow.execute_activity( long_running_function, schedule_to_close_timeout=timedelta(seconds=5) ) return await workflow.execute_activity( long_process_method, data, schedule_to_close_timeout=timedelta(seconds=5) ) Above, you are defining a @workflow.defn - this decorator must be used when defining a workflow function. The code under the@workflow.run decorator is exactly what your workflow will do and in what order. In this scenario, you’re calling each of your activities that you defined earlier, but doing so in the specific order you want to have them executed. Note: If your long_process_method fails, then the retry would not require a re-execution of long_running_function, so there wouldn’t be a need to perform a second SQL query or any other asynchronous task before that point in the workflow again.
Setting up the Worker
Now, you have to write the Worker logic that will be able to perform the workflows and activities defined earlier. Take a look at the code below:
<import asyncio import concurrent.futures from temporalio.client import Client from temporalio.worker import Worker # Import the activity and workflow from our other files from .activities import long_running_function, long_process_method from .workflows import OurSampleFunc async def main(): client = await Client.connect("localhost:7233") # Run the worker with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: worker = Worker( client, task_queue="my-task-queue", workflows=[OurSampleFunc], activities=[long_running_function, long_process_method], activity_executor=activity_executor, ) await worker.run() if __name__ == "__main__": asyncio.run(main()) From here you’re importing the activities and workflows defined in your activities.py and workflows.py before. When you create a Worker in Temporal, you are registering your workflows and activities you want it to be able to execute. Assuming you have a Temporal server running on localhost, you can run the worker by executing: python run_worker.py This will initialize the worker to be ready to perform the work when you run your workflow. Let’s run an instance of your workflow.
Running a Workflow
Source link