كانت خطوط بياناتنا أشبه بالسباغيتي: كيف أنقذنا Apache Airflow من جحيم المهام المتشابكة؟

يا جماعة الخير، السلام عليكم ورحمة الله.

بتذكر مرة، قبل كم سنة، كنا شغالين على نظام توصيات لمتجر إلكتروني. في البداية، كانت الأمور بسيطة: سكربت Python صغير بسحب بيانات المبيعات اليومية، سكربت ثاني بحللها، وثالث بحدّث قاعدة بيانات التوصيات. ربطناهم بـ cron jobs، وكنا مفكرين حالنا “مسيطرين على الوضع”.

بعد شهرين ثلاثة، كبر المشروع. صرنا نسحب بيانات من 5 مصادر مختلفة، ونعمل عمليات تنظيف معقدة، وندرب نموذج تعلم آلة صغير، وبعدين نطلع التوصيات. عدد السكربتات تضاعف، والـ cron jobs صارت زي خريطة مترو أنفاق طوكيو في ساعة الذروة. سكربت بعتمد على خمسة قبله، وسكربت ثاني لازم يشتغل بس إذا الأول نجح والثالث فشل… صارت “عجقة” ما إلها أول من آخر.

الكارثة الحقيقية صارت في يوم جمعة. صحيت الصبح لقيت رسالة من مدير المنتج بسألني: “أبو عمر، ليش التوصيات ما تحدثت من يومين؟”. قلبي وقع في رجليّ. قعدت 3 ساعات أتتبع السجلات (logs)، وأفتح سكربت ورا سكربت، لأكتشف إنه سكربت صغير في أول السلسلة، مسؤول عن سحب بيانات من واجهة برمجية (API)، فشل بصمت… ما أعطى أي خطأ، بس رجّع ملف فاضي. وكل السكربتات اللي بعده اشتغلت على “الفاضي”، والنتيجة كانت كارثية على دقة التوصيات. وقتها صرخت في المكتب وقلت: “يا جماعة، الوضع هيك ما بنفع! خطوط بياناتنا صارت زي صحن السباغيتي، كله معكرونة متشابكة ومش عارفين وين أولها من آخرها!”.

هذه الحادثة كانت نقطة التحول التي قادتنا للبحث عن حل جذري، وكان هذا الحل اسمه: Apache Airflow.

ما هي مشكلة “خطوط بيانات السباغيتي”؟

القصص اللي زي قصتي بتتكرر كل يوم في آلاف الشركات. تبدأ الأمور بسيطة، لكن مع نمو النظام، تتحول عمليات البيانات إلى كابوس حقيقي. المشاكل الأساسية يمكن تلخيصها في عدة نقاط:

  • الاعتماديات الخفية (Hidden Dependencies): أنت لا ترى بوضوح أي مهمة تعتمد على الأخرى. تغيير بسيط في سكربت قد يكسر سلسلة كاملة دون أن تدري.
  • الفشل الصامت (Silent Failures): كما حدث معنا، قد تفشل مهمة دون إرسال أي تنبيه، مما يؤدي إلى “بيانات فاسدة” (data corruption) تنتشر في النظام بأكمله.
  • صعوبة إعادة التشغيل (Difficult Retries): إذا فشلت مهمة في منتصف الليل، هل ستستيقظ خصيصاً لإعادة تشغيلها يدوياً؟ وماذا عن المهام التي تعتمد عليها؟
  • انعدام الرؤية والمراقبة (Lack of Visibility & Monitoring): لا يوجد مكان مركزي واحد لترى فيه حالة جميع مهامك، متى بدأت، متى انتهت، وأيها فشل.
  • الجدولة المحدودة (Limited Scheduling): أدوات مثل cron قوية لكنها محدودة. ماذا لو أردت تشغيل مهمة فقط بعد انتهاء ثلاث مهام أخرى بنجاح؟ أو تشغيلها استجابة لحدث معين؟

الفارس المنقذ: Apache Airflow يدخل المشهد

بكل بساطة، Apache Airflow هو منصة مفتوحة المصدر أُنشئت خصيصاً لبرمجة، جدولة، ومراقبة تدفقات العمل (Workflows) بشكل منهجي. الفكرة العبقرية في Airflow هي أنه يتعامل مع تدفقات العمل كـ “كود” (Workflows as Code).

هذا يعني أنك لم تعد ترسم خططاً على سبورة أو تدير عشرات الـ cron jobs. بدلاً من ذلك، أنت تكتب سكربت Python يصف كل تدفق العمل، بكل تفاصيله واعتمادياته ومنطق عمله.

المفاهيم الأساسية في Airflow

قبل ما ندخل في الكود، خلينا نفهم كم مصطلح أساسي:

  • DAG (Directed Acyclic Graph): هذا هو “قلب” Airflow. لا تخف من الاسم، فكرته بسيطة: هو عبارة عن مجموعة من المهام التي تريد تشغيلها، مع تحديد ترتيبها واتجاه الاعتمادية بينها (Directed)، وبدون وجود أي حلقات لا نهائية (Acyclic). كل ملف Python يحتوي على تعريف لـ DAG واحد أو أكثر.
  • Operator: هو قالب لمهمة معينة. Airflow يأتي مع عدد هائل من الـ Operators الجاهزة:
    • BashOperator: لتشغيل أوامر bash.
    • PythonOperator: لتشغيل دالة Python.
    • PostgresOperator, MySqlOperator: لتنفيذ استعلامات SQL.
    • وهناك المئات غيرها للتفاعل مع خدمات مثل AWS S3, Google Cloud Storage, Snowflake, Databricks, وغيرها.
  • Task: هي نسخة (instance) من الـ Operator. عندما تضع BashOperator في الـ DAG الخاص بك، فأنت تنشئ Task.
  • Task Instance: هي تشغيلة محددة لمهمة في وقت معين.

من الفوضى إلى النظام: مثال عملي

لنتخيل سيناريو “ما قبل السباغيتي” الذي وصفته في البداية: سحب البيانات، معالجتها، ثم تحميلها. هكذا كان يبدو في عالم الـ cron jobs:


# Crontab -e
# كل يوم الساعة 1 صباحاً
0 1 * * * /usr/bin/python3 /home/abu_omar/scripts/fetch_sales_data.py
# كل يوم الساعة 2 صباحاً (نأمل أن تكون المهمة الأولى قد انتهت!)
0 2 * * * /usr/bin/python3 /home/abu_omar/scripts/process_sales_data.py
# كل يوم الساعة 2:30 صباحاً (نأمل مجدداً!)
30 2 * * * /usr/bin/python3 /home/abu_omar/scripts/load_to_recommendation_db.py

المشاكل واضحة: لا يوجد رابط حقيقي بين المهام، وإذا استغرقت المهمة الأولى وقتاً أطول، ستبدأ الثانية قبل الأوان. وإذا فشلت الأولى، ستعمل الثانية على لا شيء!

الآن، لنرى كيف يبدو هذا مع Airflow (الوضع لوز ✨)

سنقوم بإنشاء ملف Python واحد، وليكن اسمه recommendation_pipeline_dag.py، ونضعه في مجلد الـ DAGs الخاص بـ Airflow.


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# لنفترض أن هذه هي دوالنا التي كانت في سكربتات منفصلة
def fetch_sales_data(**kwargs):
    print("جاري سحب بيانات المبيعات...")
    # ... كود سحب البيانات وحفظها في ملف مؤقت ...
    # يمكننا تمرير مسار الملف للمهمة التالية عبر XComs
    ti = kwargs['ti']
    ti.xcom_push(key='sales_data_path', value='/tmp/sales_data.json')
    print("تم سحب البيانات بنجاح.")

def process_sales_data(**kwargs):
    ti = kwargs['ti']
    # استلام مسار الملف من المهمة السابقة
    data_path = ti.xcom_pull(key='sales_data_path', task_ids='fetch_data_task')
    print(f"جاري معالجة البيانات من الملف: {data_path}")
    # ... كود معالجة البيانات ...
    ti.xcom_push(key='processed_data_path', value='/tmp/processed_data.json')
    print("تمت معالجة البيانات بنجاح.")

def load_to_db(**kwargs):
    ti = kwargs['ti']
    processed_path = ti.xcom_pull(key='processed_data_path', task_ids='process_data_task')
    print(f"جاري تحميل البيانات المعالجة ({processed_path}) إلى قاعدة البيانات...")
    # ... كود تحميل البيانات إلى DB ...
    print("تم تحديث قاعدة بيانات التوصيات.")

# ----------------------------------------------------
# تعريف الـ DAG
# ----------------------------------------------------

default_args = {
    'owner': 'abu_omar',
    'depends_on_past': False,
    'email_on_failure': ['abu.omar@example.com'], # إرسال إيميل عند الفشل
    'email_on_retry': False,
    'retries': 3, # محاولة إعادة التشغيل 3 مرات عند الفشل
    'retry_delay': timedelta(minutes=5), # الانتظار 5 دقائق بين كل محاولة
}

with DAG(
    dag_id='recommendation_engine_pipeline',
    default_args=default_args,
    description='خط بيانات لتحديث محرك التوصيات يومياً',
    schedule_interval='@daily', # التشغيل مرة كل يوم
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['recommendations', 'data-engineering'],
) as dag:

    # تعريف المهام (Tasks)
    fetch_data_task = PythonOperator(
        task_id='fetch_data_task',
        python_callable=fetch_sales_data,
    )

    process_data_task = PythonOperator(
        task_id='process_data_task',
        python_callable=process_sales_data,
    )

    load_to_db_task = PythonOperator(
        task_id='load_to_db_task',
        python_callable=load_to_db,
    )

    # أهم جزء: تحديد الاعتماديات
    # لن تبدأ مهمة المعالجة إلا بعد نجاح مهمة السحب
    # ولن تبدأ مهمة التحميل إلا بعد نجاح مهمة المعالجة
    fetch_data_task >> process_data_task >> load_to_db_task

انظر إلى الجمال والنظام! بكود Python بسيط وواضح، قمنا بتعريف كل شيء:

  1. المهام: كل مهمة هي دالة Python.
  2. الجدولة: schedule_interval='@daily'.
  3. إدارة الفشل: retries=3 و email_on_failure. هذا يحل مشكلة الفشل الصامت تماماً.
  4. الاعتماديات: السطر الأخير fetch_data_task >> process_data_task >> load_to_db_task هو السحر بعينه. إنه يخبر Airflow بوضوح “شغّل هذه، ثم إذا نجحت، شغّل تلك، ثم إذا نجحت، شغّل الأخيرة”.

والأجمل من ذلك، واجهة Airflow الرسومية تريك هذا التدفق بشكل مرئي، وتوضح لك أي المهام تعمل، وأيها نجحت، وأيها فشلت، مع إمكانية الوصول إلى السجلات لكل مهمة بنقرة زر.

نصايح من خبرة أبو عمر

بعد سنوات من العمل مع Airflow، تعلمت بعض الدروس بالطريقة الصعبة أحياناً. اسمحوا لي أن أشارككم إياها لتسهيل طريقكم:

اجعل مهامك “Idempotent”

هذه الكلمة الصعبة تعني شيئاً بسيطاً: يجب أن تكون نتيجة تشغيل مهمتك 5 مرات هي نفسها نتيجة تشغيلها مرة واحدة. هذا مهم جداً لأن Airflow سيحاول إعادة تشغيل المهام الفاشلة تلقائياً. إذا كانت مهمتك تضيف سطراً إلى ملف في كل مرة تعمل فيها، فبعد 3 محاولات فاشلة وواحدة ناجحة، سينتهي بك الأمر بـ 4 أسطر بدلاً من واحد! التصميم الصحيح هو “حذف السطر إذا كان موجوداً ثم إضافته” أو ما شابه.

لا تمرر كميات كبيرة من البيانات بين المهام

آلية Airflow لتمرير البيانات بين المهام (XComs) مصممة للمعلومات الصغيرة والبيانات الوصفية (metadata)، مثل مسار ملف، أو عدد صفوف، أو حالة معينة. ليست مصممة لنقل 10 جيجابايت من البيانات! بدلاً من ذلك، استخدم “مخزناً وسيطاً” (Intermediate Storage) مثل Amazon S3، Google Cloud Storage، أو حتى نظام الملفات المحلي. مهمة تنتج البيانات وتضعها في هذا المخزن، والمهمة التالية تقرأها منه.

خطوط الأنابيب هي كود، فعاملها ككود

استخدم Git لإدارة ملفات الـ DAGs الخاصة بك. هذا يمنحك تاريخاً للتغييرات، ويسمح بالتعاون بين أعضاء الفريق، ويمكّنك من إعداد عمليات CI/CD لنشر التغييرات على Airflow بشكل آلي وآمن.

استغل المتغيرات والاتصالات (Variables & Connections)

لا تكتب كلمات المرور أو مفاتيح الـ API مباشرة في كود الـ DAG. استخدم نظام الاتصالات (Connections) المشفر في Airflow لتخزينها بأمان. وللقيم التي قد تتغير (مثل اسم bucket في S3)، استخدم متغيرات Airflow (Variables). هذا يجعل الـ DAGs قابلة لإعادة الاستخدام وأكثر أماناً.

الخلاصة: من الآخر

الانتقال إلى Apache Airflow كان أحد أفضل القرارات التقنية التي اتخذناها. لقد حولنا من فريق إطفاء حرائق، يقضي وقته في تتبع الأخطاء الصامتة في سكربتات متشابكة، إلى فريق هندسة بيانات استباقي، يبني خطوط بيانات قوية وموثوقة ويمكن التنبؤ بسلوكها.

Airflow لا يحل مشكلة تقنية فقط، بل يحل مشكلة “ثقة”. الثقة في أن بياناتك صحيحة، وأن عملياتك تعمل كما هو متوقع، وأنك ستكون أول من يعلم عند حدوث خطأ ما، وليس آخر من يعلم.

إذا كنت لا تزال تعاني من جحيم الـ cron jobs والسكربتات المتناثرة، فنصيحتي لك يا خوي: أعطِ Apache Airflow فرصة. قد يتطلب الأمر بعض التعلم في البداية، لكن راحة البال والنظام الذي ستحصل عليه لا يقدر بثمن. ✅

أبو عمر

سجل دخولك لعمل نقاش تفاعلي

كافة المحادثات خاصة ولا يتم عرضها على الموقع نهائياً

آراء من النقاشات

لا توجد آراء منشورة بعد. كن أول من يشارك رأيه!

آخر المدونات

برمجة وقواعد بيانات

تحديثات قاعدة البيانات بدون توقف: كيف أنقذنا نمط التوسيع والتعاقد (Expand/Contract) من جحيم التوقفات المجدولة؟

هل سئمت من إيقاف الخدمة مع كل تحديث لهيكلة قاعدة البيانات؟ أشارككم قصة حقيقية وكيف أنقذنا نمط التوسيع والتعاقد (Expand/Contract) من ليالي النشر الطويلة والمُجهدة،...

4 يونيو، 2026 قراءة المزيد
الشبكات والـ APIs

كانت إعادة المحاولة كارثة: كيف أنقذتنا مفاتيح عدم تكرار العمليات (Idempotency Keys) من جحيم الفواتير المزدوجة؟

أشارككم قصة حقيقية من الخنادق البرمجية، يوم كاد خطأ بسيط في إعادة محاولة طلبات الدفع أن يكلفنا سمعتنا وأموال عملائنا. اكتشفوا معنا كيف كانت مفاتيح...

4 يونيو، 2026 قراءة المزيد
الحوسبة السحابية

من التوقف التام إلى النجاة: كيف أنقذتنا استراتيجية “الضوء المرشد” (Pilot Light) يوم انقطعت السحابة؟

أتذكر ذلك اليوم جيدًا، فنجان القهوة الصباحي، وصوت تنبيهات المراقبة يصرخ كأنه يوم القيامة. كانت منطقة سحابية كاملة قد توقفت عن العمل، لكن بفضل استراتيجية...

4 يونيو، 2026 قراءة المزيد
التوظيف وبناء الهوية التقنية

كانت مهمتي البرمجية للاختبار مجرد كود: كيف أنقذني توثيق القرارات من جحيم الصمت بعد المقابلة؟

أشارككم قصة حقيقية من بداياتي، وكيف تعلمت بالطريقة الصعبة أن المهمة البرمجية ليست مجرد كتابة كود، بل هي فرصة لإظهار طريقة تفكيرك. اكتشف كيف يمكن...

4 يونيو، 2026 قراءة المزيد
التكنلوجيا المالية Fintech

من الانتظار لأيام إلى الدفع في ثوانٍ: كيف أنقذتنا شبكات الدفع الفوري من جحيم التحويلات البنكية؟

أسرد لكم من واقع تجربتي كـ "أبو عمر"، كيف عانينا من بطء وتكلفة التحويلات البنكية الدولية، وكيف جاءت شبكات الدفع الفوري ومعيار ISO 20022 لتكون...

4 يونيو، 2026 قراءة المزيد
البنية التحتية وإدارة السيرفرات

كان كل خادم لدينا ‘ندفة ثلج’ فريدة: كيف أنقذنا ‘الكود كبنية تحتية’ (IaC) من جحيم الانجراف اليدوي؟

في هذه المقالة، أشارككم قصة حقيقية من قلب المعركة التقنية مع "خوادم ندفات الثلج" الفوضوية. سنغوص في مفهوم "الكود كبنية تحتية" (IaC) وكيف أن أدوات...

4 يونيو، 2026 قراءة المزيد
اختبارات الاداء والجودة

كانت تغطية الاختبارات 100% لكن الأخطاء تتسرب: كيف أنقذنا “الاختبار الطفري” من جحيم الثقة الزائفة؟

كنا نظن أن تغطية الاختبار بنسبة 100% هي درعنا الواقي، لكن الأخطاء كانت تتسلل إلى الإنتاج كاللصوص في ليل بهيم. اكتشف كيف أنقذنا "الاختبار الطفري"...

4 يونيو، 2026 قراءة المزيد
البودكاست