يا جماعة الخير، السلام عليكم ورحمة الله.
بتذكر مرة، قبل كم سنة، كنا شغالين على نظام توصيات لمتجر إلكتروني. في البداية، كانت الأمور بسيطة: سكربت Python صغير بسحب بيانات المبيعات اليومية، سكربت ثاني بحللها، وثالث بحدّث قاعدة بيانات التوصيات. ربطناهم بـ cron jobs، وكنا مفكرين حالنا “مسيطرين على الوضع”.
بعد شهرين ثلاثة، كبر المشروع. صرنا نسحب بيانات من 5 مصادر مختلفة، ونعمل عمليات تنظيف معقدة، وندرب نموذج تعلم آلة صغير، وبعدين نطلع التوصيات. عدد السكربتات تضاعف، والـ cron jobs صارت زي خريطة مترو أنفاق طوكيو في ساعة الذروة. سكربت بعتمد على خمسة قبله، وسكربت ثاني لازم يشتغل بس إذا الأول نجح والثالث فشل… صارت “عجقة” ما إلها أول من آخر.
الكارثة الحقيقية صارت في يوم جمعة. صحيت الصبح لقيت رسالة من مدير المنتج بسألني: “أبو عمر، ليش التوصيات ما تحدثت من يومين؟”. قلبي وقع في رجليّ. قعدت 3 ساعات أتتبع السجلات (logs)، وأفتح سكربت ورا سكربت، لأكتشف إنه سكربت صغير في أول السلسلة، مسؤول عن سحب بيانات من واجهة برمجية (API)، فشل بصمت… ما أعطى أي خطأ، بس رجّع ملف فاضي. وكل السكربتات اللي بعده اشتغلت على “الفاضي”، والنتيجة كانت كارثية على دقة التوصيات. وقتها صرخت في المكتب وقلت: “يا جماعة، الوضع هيك ما بنفع! خطوط بياناتنا صارت زي صحن السباغيتي، كله معكرونة متشابكة ومش عارفين وين أولها من آخرها!”.
هذه الحادثة كانت نقطة التحول التي قادتنا للبحث عن حل جذري، وكان هذا الحل اسمه: Apache Airflow.
ما هي مشكلة “خطوط بيانات السباغيتي”؟
القصص اللي زي قصتي بتتكرر كل يوم في آلاف الشركات. تبدأ الأمور بسيطة، لكن مع نمو النظام، تتحول عمليات البيانات إلى كابوس حقيقي. المشاكل الأساسية يمكن تلخيصها في عدة نقاط:
- الاعتماديات الخفية (Hidden Dependencies): أنت لا ترى بوضوح أي مهمة تعتمد على الأخرى. تغيير بسيط في سكربت قد يكسر سلسلة كاملة دون أن تدري.
- الفشل الصامت (Silent Failures): كما حدث معنا، قد تفشل مهمة دون إرسال أي تنبيه، مما يؤدي إلى “بيانات فاسدة” (data corruption) تنتشر في النظام بأكمله.
- صعوبة إعادة التشغيل (Difficult Retries): إذا فشلت مهمة في منتصف الليل، هل ستستيقظ خصيصاً لإعادة تشغيلها يدوياً؟ وماذا عن المهام التي تعتمد عليها؟
- انعدام الرؤية والمراقبة (Lack of Visibility & Monitoring): لا يوجد مكان مركزي واحد لترى فيه حالة جميع مهامك، متى بدأت، متى انتهت، وأيها فشل.
- الجدولة المحدودة (Limited Scheduling): أدوات مثل cron قوية لكنها محدودة. ماذا لو أردت تشغيل مهمة فقط بعد انتهاء ثلاث مهام أخرى بنجاح؟ أو تشغيلها استجابة لحدث معين؟
الفارس المنقذ: Apache Airflow يدخل المشهد
بكل بساطة، Apache Airflow هو منصة مفتوحة المصدر أُنشئت خصيصاً لبرمجة، جدولة، ومراقبة تدفقات العمل (Workflows) بشكل منهجي. الفكرة العبقرية في Airflow هي أنه يتعامل مع تدفقات العمل كـ “كود” (Workflows as Code).
هذا يعني أنك لم تعد ترسم خططاً على سبورة أو تدير عشرات الـ cron jobs. بدلاً من ذلك، أنت تكتب سكربت Python يصف كل تدفق العمل، بكل تفاصيله واعتمادياته ومنطق عمله.
المفاهيم الأساسية في Airflow
قبل ما ندخل في الكود، خلينا نفهم كم مصطلح أساسي:
- DAG (Directed Acyclic Graph): هذا هو “قلب” Airflow. لا تخف من الاسم، فكرته بسيطة: هو عبارة عن مجموعة من المهام التي تريد تشغيلها، مع تحديد ترتيبها واتجاه الاعتمادية بينها (Directed)، وبدون وجود أي حلقات لا نهائية (Acyclic). كل ملف Python يحتوي على تعريف لـ DAG واحد أو أكثر.
- Operator: هو قالب لمهمة معينة. Airflow يأتي مع عدد هائل من الـ Operators الجاهزة:
BashOperator: لتشغيل أوامر bash.PythonOperator: لتشغيل دالة Python.PostgresOperator,MySqlOperator: لتنفيذ استعلامات SQL.- وهناك المئات غيرها للتفاعل مع خدمات مثل AWS S3, Google Cloud Storage, Snowflake, Databricks, وغيرها.
- Task: هي نسخة (instance) من الـ Operator. عندما تضع
BashOperatorفي الـ DAG الخاص بك، فأنت تنشئ Task. - Task Instance: هي تشغيلة محددة لمهمة في وقت معين.
من الفوضى إلى النظام: مثال عملي
لنتخيل سيناريو “ما قبل السباغيتي” الذي وصفته في البداية: سحب البيانات، معالجتها، ثم تحميلها. هكذا كان يبدو في عالم الـ cron jobs:
# Crontab -e
# كل يوم الساعة 1 صباحاً
0 1 * * * /usr/bin/python3 /home/abu_omar/scripts/fetch_sales_data.py
# كل يوم الساعة 2 صباحاً (نأمل أن تكون المهمة الأولى قد انتهت!)
0 2 * * * /usr/bin/python3 /home/abu_omar/scripts/process_sales_data.py
# كل يوم الساعة 2:30 صباحاً (نأمل مجدداً!)
30 2 * * * /usr/bin/python3 /home/abu_omar/scripts/load_to_recommendation_db.py
المشاكل واضحة: لا يوجد رابط حقيقي بين المهام، وإذا استغرقت المهمة الأولى وقتاً أطول، ستبدأ الثانية قبل الأوان. وإذا فشلت الأولى، ستعمل الثانية على لا شيء!
الآن، لنرى كيف يبدو هذا مع Airflow (الوضع لوز ✨)
سنقوم بإنشاء ملف Python واحد، وليكن اسمه recommendation_pipeline_dag.py، ونضعه في مجلد الـ DAGs الخاص بـ Airflow.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# لنفترض أن هذه هي دوالنا التي كانت في سكربتات منفصلة
def fetch_sales_data(**kwargs):
print("جاري سحب بيانات المبيعات...")
# ... كود سحب البيانات وحفظها في ملف مؤقت ...
# يمكننا تمرير مسار الملف للمهمة التالية عبر XComs
ti = kwargs['ti']
ti.xcom_push(key='sales_data_path', value='/tmp/sales_data.json')
print("تم سحب البيانات بنجاح.")
def process_sales_data(**kwargs):
ti = kwargs['ti']
# استلام مسار الملف من المهمة السابقة
data_path = ti.xcom_pull(key='sales_data_path', task_ids='fetch_data_task')
print(f"جاري معالجة البيانات من الملف: {data_path}")
# ... كود معالجة البيانات ...
ti.xcom_push(key='processed_data_path', value='/tmp/processed_data.json')
print("تمت معالجة البيانات بنجاح.")
def load_to_db(**kwargs):
ti = kwargs['ti']
processed_path = ti.xcom_pull(key='processed_data_path', task_ids='process_data_task')
print(f"جاري تحميل البيانات المعالجة ({processed_path}) إلى قاعدة البيانات...")
# ... كود تحميل البيانات إلى DB ...
print("تم تحديث قاعدة بيانات التوصيات.")
# ----------------------------------------------------
# تعريف الـ DAG
# ----------------------------------------------------
default_args = {
'owner': 'abu_omar',
'depends_on_past': False,
'email_on_failure': ['abu.omar@example.com'], # إرسال إيميل عند الفشل
'email_on_retry': False,
'retries': 3, # محاولة إعادة التشغيل 3 مرات عند الفشل
'retry_delay': timedelta(minutes=5), # الانتظار 5 دقائق بين كل محاولة
}
with DAG(
dag_id='recommendation_engine_pipeline',
default_args=default_args,
description='خط بيانات لتحديث محرك التوصيات يومياً',
schedule_interval='@daily', # التشغيل مرة كل يوم
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['recommendations', 'data-engineering'],
) as dag:
# تعريف المهام (Tasks)
fetch_data_task = PythonOperator(
task_id='fetch_data_task',
python_callable=fetch_sales_data,
)
process_data_task = PythonOperator(
task_id='process_data_task',
python_callable=process_sales_data,
)
load_to_db_task = PythonOperator(
task_id='load_to_db_task',
python_callable=load_to_db,
)
# أهم جزء: تحديد الاعتماديات
# لن تبدأ مهمة المعالجة إلا بعد نجاح مهمة السحب
# ولن تبدأ مهمة التحميل إلا بعد نجاح مهمة المعالجة
fetch_data_task >> process_data_task >> load_to_db_task
انظر إلى الجمال والنظام! بكود Python بسيط وواضح، قمنا بتعريف كل شيء:
- المهام: كل مهمة هي دالة Python.
- الجدولة:
schedule_interval='@daily'. - إدارة الفشل:
retries=3وemail_on_failure. هذا يحل مشكلة الفشل الصامت تماماً. - الاعتماديات: السطر الأخير
fetch_data_task >> process_data_task >> load_to_db_taskهو السحر بعينه. إنه يخبر Airflow بوضوح “شغّل هذه، ثم إذا نجحت، شغّل تلك، ثم إذا نجحت، شغّل الأخيرة”.
والأجمل من ذلك، واجهة Airflow الرسومية تريك هذا التدفق بشكل مرئي، وتوضح لك أي المهام تعمل، وأيها نجحت، وأيها فشلت، مع إمكانية الوصول إلى السجلات لكل مهمة بنقرة زر.
نصايح من خبرة أبو عمر
بعد سنوات من العمل مع Airflow، تعلمت بعض الدروس بالطريقة الصعبة أحياناً. اسمحوا لي أن أشارككم إياها لتسهيل طريقكم:
اجعل مهامك “Idempotent”
هذه الكلمة الصعبة تعني شيئاً بسيطاً: يجب أن تكون نتيجة تشغيل مهمتك 5 مرات هي نفسها نتيجة تشغيلها مرة واحدة. هذا مهم جداً لأن Airflow سيحاول إعادة تشغيل المهام الفاشلة تلقائياً. إذا كانت مهمتك تضيف سطراً إلى ملف في كل مرة تعمل فيها، فبعد 3 محاولات فاشلة وواحدة ناجحة، سينتهي بك الأمر بـ 4 أسطر بدلاً من واحد! التصميم الصحيح هو “حذف السطر إذا كان موجوداً ثم إضافته” أو ما شابه.
لا تمرر كميات كبيرة من البيانات بين المهام
آلية Airflow لتمرير البيانات بين المهام (XComs) مصممة للمعلومات الصغيرة والبيانات الوصفية (metadata)، مثل مسار ملف، أو عدد صفوف، أو حالة معينة. ليست مصممة لنقل 10 جيجابايت من البيانات! بدلاً من ذلك، استخدم “مخزناً وسيطاً” (Intermediate Storage) مثل Amazon S3، Google Cloud Storage، أو حتى نظام الملفات المحلي. مهمة تنتج البيانات وتضعها في هذا المخزن، والمهمة التالية تقرأها منه.
خطوط الأنابيب هي كود، فعاملها ككود
استخدم Git لإدارة ملفات الـ DAGs الخاصة بك. هذا يمنحك تاريخاً للتغييرات، ويسمح بالتعاون بين أعضاء الفريق، ويمكّنك من إعداد عمليات CI/CD لنشر التغييرات على Airflow بشكل آلي وآمن.
استغل المتغيرات والاتصالات (Variables & Connections)
لا تكتب كلمات المرور أو مفاتيح الـ API مباشرة في كود الـ DAG. استخدم نظام الاتصالات (Connections) المشفر في Airflow لتخزينها بأمان. وللقيم التي قد تتغير (مثل اسم bucket في S3)، استخدم متغيرات Airflow (Variables). هذا يجعل الـ DAGs قابلة لإعادة الاستخدام وأكثر أماناً.
الخلاصة: من الآخر
الانتقال إلى Apache Airflow كان أحد أفضل القرارات التقنية التي اتخذناها. لقد حولنا من فريق إطفاء حرائق، يقضي وقته في تتبع الأخطاء الصامتة في سكربتات متشابكة، إلى فريق هندسة بيانات استباقي، يبني خطوط بيانات قوية وموثوقة ويمكن التنبؤ بسلوكها.
Airflow لا يحل مشكلة تقنية فقط، بل يحل مشكلة “ثقة”. الثقة في أن بياناتك صحيحة، وأن عملياتك تعمل كما هو متوقع، وأنك ستكون أول من يعلم عند حدوث خطأ ما، وليس آخر من يعلم.
إذا كنت لا تزال تعاني من جحيم الـ cron jobs والسكربتات المتناثرة، فنصيحتي لك يا خوي: أعطِ Apache Airflow فرصة. قد يتطلب الأمر بعض التعلم في البداية، لكن راحة البال والنظام الذي ستحصل عليه لا يقدر بثمن. ✅