كانت خطوط بياناتنا أشبه بالسباغيتي: كيف أنقذنا Apache Airflow من جحيم المهام المتشابكة؟

يا جماعة الخير، السلام عليكم ورحمة الله.

بتذكر مرة، قبل كم سنة، كنا شغالين على نظام توصيات لمتجر إلكتروني. في البداية، كانت الأمور بسيطة: سكربت Python صغير بسحب بيانات المبيعات اليومية، سكربت ثاني بحللها، وثالث بحدّث قاعدة بيانات التوصيات. ربطناهم بـ cron jobs، وكنا مفكرين حالنا “مسيطرين على الوضع”.

بعد شهرين ثلاثة، كبر المشروع. صرنا نسحب بيانات من 5 مصادر مختلفة، ونعمل عمليات تنظيف معقدة، وندرب نموذج تعلم آلة صغير، وبعدين نطلع التوصيات. عدد السكربتات تضاعف، والـ cron jobs صارت زي خريطة مترو أنفاق طوكيو في ساعة الذروة. سكربت بعتمد على خمسة قبله، وسكربت ثاني لازم يشتغل بس إذا الأول نجح والثالث فشل… صارت “عجقة” ما إلها أول من آخر.

الكارثة الحقيقية صارت في يوم جمعة. صحيت الصبح لقيت رسالة من مدير المنتج بسألني: “أبو عمر، ليش التوصيات ما تحدثت من يومين؟”. قلبي وقع في رجليّ. قعدت 3 ساعات أتتبع السجلات (logs)، وأفتح سكربت ورا سكربت، لأكتشف إنه سكربت صغير في أول السلسلة، مسؤول عن سحب بيانات من واجهة برمجية (API)، فشل بصمت… ما أعطى أي خطأ، بس رجّع ملف فاضي. وكل السكربتات اللي بعده اشتغلت على “الفاضي”، والنتيجة كانت كارثية على دقة التوصيات. وقتها صرخت في المكتب وقلت: “يا جماعة، الوضع هيك ما بنفع! خطوط بياناتنا صارت زي صحن السباغيتي، كله معكرونة متشابكة ومش عارفين وين أولها من آخرها!”.

هذه الحادثة كانت نقطة التحول التي قادتنا للبحث عن حل جذري، وكان هذا الحل اسمه: Apache Airflow.

ما هي مشكلة “خطوط بيانات السباغيتي”؟

القصص اللي زي قصتي بتتكرر كل يوم في آلاف الشركات. تبدأ الأمور بسيطة، لكن مع نمو النظام، تتحول عمليات البيانات إلى كابوس حقيقي. المشاكل الأساسية يمكن تلخيصها في عدة نقاط:

  • الاعتماديات الخفية (Hidden Dependencies): أنت لا ترى بوضوح أي مهمة تعتمد على الأخرى. تغيير بسيط في سكربت قد يكسر سلسلة كاملة دون أن تدري.
  • الفشل الصامت (Silent Failures): كما حدث معنا، قد تفشل مهمة دون إرسال أي تنبيه، مما يؤدي إلى “بيانات فاسدة” (data corruption) تنتشر في النظام بأكمله.
  • صعوبة إعادة التشغيل (Difficult Retries): إذا فشلت مهمة في منتصف الليل، هل ستستيقظ خصيصاً لإعادة تشغيلها يدوياً؟ وماذا عن المهام التي تعتمد عليها؟
  • انعدام الرؤية والمراقبة (Lack of Visibility & Monitoring): لا يوجد مكان مركزي واحد لترى فيه حالة جميع مهامك، متى بدأت، متى انتهت، وأيها فشل.
  • الجدولة المحدودة (Limited Scheduling): أدوات مثل cron قوية لكنها محدودة. ماذا لو أردت تشغيل مهمة فقط بعد انتهاء ثلاث مهام أخرى بنجاح؟ أو تشغيلها استجابة لحدث معين؟

الفارس المنقذ: Apache Airflow يدخل المشهد

بكل بساطة، Apache Airflow هو منصة مفتوحة المصدر أُنشئت خصيصاً لبرمجة، جدولة، ومراقبة تدفقات العمل (Workflows) بشكل منهجي. الفكرة العبقرية في Airflow هي أنه يتعامل مع تدفقات العمل كـ “كود” (Workflows as Code).

هذا يعني أنك لم تعد ترسم خططاً على سبورة أو تدير عشرات الـ cron jobs. بدلاً من ذلك، أنت تكتب سكربت Python يصف كل تدفق العمل، بكل تفاصيله واعتمادياته ومنطق عمله.

المفاهيم الأساسية في Airflow

قبل ما ندخل في الكود، خلينا نفهم كم مصطلح أساسي:

  • DAG (Directed Acyclic Graph): هذا هو “قلب” Airflow. لا تخف من الاسم، فكرته بسيطة: هو عبارة عن مجموعة من المهام التي تريد تشغيلها، مع تحديد ترتيبها واتجاه الاعتمادية بينها (Directed)، وبدون وجود أي حلقات لا نهائية (Acyclic). كل ملف Python يحتوي على تعريف لـ DAG واحد أو أكثر.
  • Operator: هو قالب لمهمة معينة. Airflow يأتي مع عدد هائل من الـ Operators الجاهزة:
    • BashOperator: لتشغيل أوامر bash.
    • PythonOperator: لتشغيل دالة Python.
    • PostgresOperator, MySqlOperator: لتنفيذ استعلامات SQL.
    • وهناك المئات غيرها للتفاعل مع خدمات مثل AWS S3, Google Cloud Storage, Snowflake, Databricks, وغيرها.
  • Task: هي نسخة (instance) من الـ Operator. عندما تضع BashOperator في الـ DAG الخاص بك، فأنت تنشئ Task.
  • Task Instance: هي تشغيلة محددة لمهمة في وقت معين.

من الفوضى إلى النظام: مثال عملي

لنتخيل سيناريو “ما قبل السباغيتي” الذي وصفته في البداية: سحب البيانات، معالجتها، ثم تحميلها. هكذا كان يبدو في عالم الـ cron jobs:


# Crontab -e
# كل يوم الساعة 1 صباحاً
0 1 * * * /usr/bin/python3 /home/abu_omar/scripts/fetch_sales_data.py
# كل يوم الساعة 2 صباحاً (نأمل أن تكون المهمة الأولى قد انتهت!)
0 2 * * * /usr/bin/python3 /home/abu_omar/scripts/process_sales_data.py
# كل يوم الساعة 2:30 صباحاً (نأمل مجدداً!)
30 2 * * * /usr/bin/python3 /home/abu_omar/scripts/load_to_recommendation_db.py

المشاكل واضحة: لا يوجد رابط حقيقي بين المهام، وإذا استغرقت المهمة الأولى وقتاً أطول، ستبدأ الثانية قبل الأوان. وإذا فشلت الأولى، ستعمل الثانية على لا شيء!

الآن، لنرى كيف يبدو هذا مع Airflow (الوضع لوز ✨)

سنقوم بإنشاء ملف Python واحد، وليكن اسمه recommendation_pipeline_dag.py، ونضعه في مجلد الـ DAGs الخاص بـ Airflow.


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# لنفترض أن هذه هي دوالنا التي كانت في سكربتات منفصلة
def fetch_sales_data(**kwargs):
    print("جاري سحب بيانات المبيعات...")
    # ... كود سحب البيانات وحفظها في ملف مؤقت ...
    # يمكننا تمرير مسار الملف للمهمة التالية عبر XComs
    ti = kwargs['ti']
    ti.xcom_push(key='sales_data_path', value='/tmp/sales_data.json')
    print("تم سحب البيانات بنجاح.")

def process_sales_data(**kwargs):
    ti = kwargs['ti']
    # استلام مسار الملف من المهمة السابقة
    data_path = ti.xcom_pull(key='sales_data_path', task_ids='fetch_data_task')
    print(f"جاري معالجة البيانات من الملف: {data_path}")
    # ... كود معالجة البيانات ...
    ti.xcom_push(key='processed_data_path', value='/tmp/processed_data.json')
    print("تمت معالجة البيانات بنجاح.")

def load_to_db(**kwargs):
    ti = kwargs['ti']
    processed_path = ti.xcom_pull(key='processed_data_path', task_ids='process_data_task')
    print(f"جاري تحميل البيانات المعالجة ({processed_path}) إلى قاعدة البيانات...")
    # ... كود تحميل البيانات إلى DB ...
    print("تم تحديث قاعدة بيانات التوصيات.")

# ----------------------------------------------------
# تعريف الـ DAG
# ----------------------------------------------------

default_args = {
    'owner': 'abu_omar',
    'depends_on_past': False,
    'email_on_failure': ['abu.omar@example.com'], # إرسال إيميل عند الفشل
    'email_on_retry': False,
    'retries': 3, # محاولة إعادة التشغيل 3 مرات عند الفشل
    'retry_delay': timedelta(minutes=5), # الانتظار 5 دقائق بين كل محاولة
}

with DAG(
    dag_id='recommendation_engine_pipeline',
    default_args=default_args,
    description='خط بيانات لتحديث محرك التوصيات يومياً',
    schedule_interval='@daily', # التشغيل مرة كل يوم
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['recommendations', 'data-engineering'],
) as dag:

    # تعريف المهام (Tasks)
    fetch_data_task = PythonOperator(
        task_id='fetch_data_task',
        python_callable=fetch_sales_data,
    )

    process_data_task = PythonOperator(
        task_id='process_data_task',
        python_callable=process_sales_data,
    )

    load_to_db_task = PythonOperator(
        task_id='load_to_db_task',
        python_callable=load_to_db,
    )

    # أهم جزء: تحديد الاعتماديات
    # لن تبدأ مهمة المعالجة إلا بعد نجاح مهمة السحب
    # ولن تبدأ مهمة التحميل إلا بعد نجاح مهمة المعالجة
    fetch_data_task >> process_data_task >> load_to_db_task

انظر إلى الجمال والنظام! بكود Python بسيط وواضح، قمنا بتعريف كل شيء:

  1. المهام: كل مهمة هي دالة Python.
  2. الجدولة: schedule_interval='@daily'.
  3. إدارة الفشل: retries=3 و email_on_failure. هذا يحل مشكلة الفشل الصامت تماماً.
  4. الاعتماديات: السطر الأخير fetch_data_task >> process_data_task >> load_to_db_task هو السحر بعينه. إنه يخبر Airflow بوضوح “شغّل هذه، ثم إذا نجحت، شغّل تلك، ثم إذا نجحت، شغّل الأخيرة”.

والأجمل من ذلك، واجهة Airflow الرسومية تريك هذا التدفق بشكل مرئي، وتوضح لك أي المهام تعمل، وأيها نجحت، وأيها فشلت، مع إمكانية الوصول إلى السجلات لكل مهمة بنقرة زر.

نصايح من خبرة أبو عمر

بعد سنوات من العمل مع Airflow، تعلمت بعض الدروس بالطريقة الصعبة أحياناً. اسمحوا لي أن أشارككم إياها لتسهيل طريقكم:

اجعل مهامك “Idempotent”

هذه الكلمة الصعبة تعني شيئاً بسيطاً: يجب أن تكون نتيجة تشغيل مهمتك 5 مرات هي نفسها نتيجة تشغيلها مرة واحدة. هذا مهم جداً لأن Airflow سيحاول إعادة تشغيل المهام الفاشلة تلقائياً. إذا كانت مهمتك تضيف سطراً إلى ملف في كل مرة تعمل فيها، فبعد 3 محاولات فاشلة وواحدة ناجحة، سينتهي بك الأمر بـ 4 أسطر بدلاً من واحد! التصميم الصحيح هو “حذف السطر إذا كان موجوداً ثم إضافته” أو ما شابه.

لا تمرر كميات كبيرة من البيانات بين المهام

آلية Airflow لتمرير البيانات بين المهام (XComs) مصممة للمعلومات الصغيرة والبيانات الوصفية (metadata)، مثل مسار ملف، أو عدد صفوف، أو حالة معينة. ليست مصممة لنقل 10 جيجابايت من البيانات! بدلاً من ذلك، استخدم “مخزناً وسيطاً” (Intermediate Storage) مثل Amazon S3، Google Cloud Storage، أو حتى نظام الملفات المحلي. مهمة تنتج البيانات وتضعها في هذا المخزن، والمهمة التالية تقرأها منه.

خطوط الأنابيب هي كود، فعاملها ككود

استخدم Git لإدارة ملفات الـ DAGs الخاصة بك. هذا يمنحك تاريخاً للتغييرات، ويسمح بالتعاون بين أعضاء الفريق، ويمكّنك من إعداد عمليات CI/CD لنشر التغييرات على Airflow بشكل آلي وآمن.

استغل المتغيرات والاتصالات (Variables & Connections)

لا تكتب كلمات المرور أو مفاتيح الـ API مباشرة في كود الـ DAG. استخدم نظام الاتصالات (Connections) المشفر في Airflow لتخزينها بأمان. وللقيم التي قد تتغير (مثل اسم bucket في S3)، استخدم متغيرات Airflow (Variables). هذا يجعل الـ DAGs قابلة لإعادة الاستخدام وأكثر أماناً.

الخلاصة: من الآخر

الانتقال إلى Apache Airflow كان أحد أفضل القرارات التقنية التي اتخذناها. لقد حولنا من فريق إطفاء حرائق، يقضي وقته في تتبع الأخطاء الصامتة في سكربتات متشابكة، إلى فريق هندسة بيانات استباقي، يبني خطوط بيانات قوية وموثوقة ويمكن التنبؤ بسلوكها.

Airflow لا يحل مشكلة تقنية فقط، بل يحل مشكلة “ثقة”. الثقة في أن بياناتك صحيحة، وأن عملياتك تعمل كما هو متوقع، وأنك ستكون أول من يعلم عند حدوث خطأ ما، وليس آخر من يعلم.

إذا كنت لا تزال تعاني من جحيم الـ cron jobs والسكربتات المتناثرة، فنصيحتي لك يا خوي: أعطِ Apache Airflow فرصة. قد يتطلب الأمر بعض التعلم في البداية، لكن راحة البال والنظام الذي ستحصل عليه لا يقدر بثمن. ✅

أبو عمر

سجل دخولك لعمل نقاش تفاعلي

كافة المحادثات خاصة ولا يتم عرضها على الموقع نهائياً

آراء من النقاشات

لا توجد آراء منشورة بعد. كن أول من يشارك رأيه!

آخر المدونات

اختبارات الاداء والجودة

كانت تغطية اختباراتنا 100% لكنها كذبة: كيف أنقذنا “الاختبار الطفري” من جحيم الثقة الزائفة؟

كنا نظن أن تغطية اختبارات بنسبة 100% هي درعنا الواقي، لكنها كانت ثقة زائفة. أشارككم قصة كيف كشف "الاختبار الطفري" (Mutation Testing) ضعف اختباراتنا وأنقذ...

28 أبريل، 2026 قراءة المزيد
أدوات وانتاجية

كانت بيئة التطوير لدينا كلاً في وادٍ: كيف أنقذتنا ‘حاويات التطوير’ (Dev Containers) من جحيم ‘إنها تعمل على جهازي’؟

أتذكر جيداً ذلك اليوم الذي كاد فيه مبرمج جديد أن يستقيل في أسبوعه الأول بسبب مشاكل إعداد البيئة. في هذه المقالة، أسرد لكم يا جماعة...

28 أبريل، 2026 قراءة المزيد
نصائح برمجية

كان كودنا يثق بالجميع: كيف أنقذتنا ‘البرمجة الدفاعية’ من جحيم المدخلات غير المتوقعة؟

في عالم البرمجة، الثقة المفرطة هي وصفة لكارثة. أشارككم قصة حقيقية من تجربتي حول كيف تعلمنا بالطريقة الصعبة أهمية "البرمجة الدفاعية"، وكيف يمكن لهذا المفهوم...

28 أبريل، 2026 قراءة المزيد
​معمارية البرمجيات

كانت نماذج بياناتنا في حرب أهلية: كيف أنقذ نمط CQRS نظامنا من جحيم تضارب القراءة والكتابة؟

في عالم البرمجيات، تتصارع عمليات القراءة والكتابة على نفس البيانات، مما يخلق فوضى في الأداء والتعقيد. أشارككم قصة حقيقية حول كيف استخدمنا نمط CQRS لفصل...

28 أبريل، 2026 قراءة المزيد
تجربة المستخدم والابداع البصري

كانت قوائمنا تربك المستخدمين: كيف أنقذنا ‘قانون هيك’ (Hick’s Law) من جحيم شلل الاختيار؟

أشارككم قصة حقيقية من الميدان، حين كانت قوائم تطبيقنا الطويلة تتسبب في "شلل الاختيار" للمستخدمين. اكتشفوا كيف استعنا بقانون "هيك" النفسي لتبسيط الواجهات، وتقليل الحيرة،...

28 أبريل، 2026 قراءة المزيد
البودكاست