Python

Task Scheduler Python - Automasi Jadwal dengan Schedule dan APScheduler

Pernah Lupa Backup Database Jam 3 Pagi?

Saya pernah punya client yang minta backup database MySQL otomatis setiap malam. Awalnya saya pakai cron job biasa di Linux, tapi lama-lama ada masalah: cron job gagal diam-diam, log-nya berantakan, dan monitoring-nya nol. Kalau backup gagal tiga hari berturut-turut, saya baru tahu waktu client komplain.

Setelah itu saya coba pendekatan berbeda: bikin scheduler pakai Python. Hasilnya? Lebih fleksibel, bisa kirim notifikasi kalau gagal, dan log-nya rapi. Di artikel ini, saya mau share cara bikin task scheduler di Python pakai dua library populer: schedule dan APScheduler.

Kenalan dengan Schedule - Simpel dan Ringan

Library schedule itu favorit saya kalau cuma butuh scheduling sederhana. Syntax-nya mirip bahasa Inggris, jadi gampang dibaca. Cocok banget buat script yang jalan terus di background.

Install dulu:


pip install schedule

Lalu bikin script pertama:


import schedule
import time
from datetime import datetime

def cek_server():
    """Cek status server setiap 5 menit"""
    print(f"[{datetime.now()}] Mengecek status server...")
    # Di sini taruh logic cek server kamu

def kirim_laporan():
    """Kirim laporan harian jam 8 pagi"""
    print(f"[{datetime.now()}] Mengirim laporan harian...")

def bersihkan_log():
    """Bersihkan log lama setiap hari Minggu jam 2 pagi"""
    print(f"[{datetime.now()}] Membersihkan log lama...")

# Setiap 5 menit
schedule.every(5).minutes.do(cek_server)

# Setiap hari jam 08:00
schedule.every().day.at("08:00").do(kirim_laporan)

# Setiap hari Minggu jam 02:00
schedule.every().sunday.at("02:00").do(bersihkan_log)

print("Scheduler aktif, Ctrl+C untuk berhenti...")
while True:
    schedule.run_pending()
    time.sleep(1)

Yang bikin schedule enak itu readable-nya. schedule.every(5).minutes.do(cek_server) - langsung keliatan apa yang dilakuin. Tapi ada batasan: dia single-thread, jadi kalau task-nya lama (misal backup 10 menit), task berikutnya nunggu selesai dulu.

Tambahkan Error Handling

Ini yang sering dilupain orang. Kalau task gagal tanpa error handling, scheduler tetap jalan tapi output-nya hilang. Saya biasa bikin wrapper:


import schedule
import time
import traceback
import logging
from datetime import datetime

logging.basicConfig(
    filename='scheduler.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def safe_run(job_func):
    """Wrapper biar error ga bikin scheduler mati"""
    def wrapper():
        try:
            logging.info(f"Menjalankan: {job_func.__name__}")
            job_func()
            logging.info(f"Selesai: {job_func.__name__}")
        except Exception as e:
            error_msg = f"ERROR di {job_func.__name__}: {str(e)}\n{traceback.format_exc()}"
            logging.error(error_msg)
            print(error_msg)
            # Bisa tambahin kirim email/Telegram di sini
    return wrapper

def backup_database():
    """Backup database MySQL"""
    print("Backup database dimulai...")
    # Logic backup di sini

def cleanup_temp_files():
    """Hapus file temporary yang lebih dari 7 hari"""
    print("Cleanup file temporary...")

schedule.every().day.at("02:00").do(safe_run(backup_database))
schedule.every(6).hours.do(safe_run(cleanup_temp_files))

while True:
    schedule.run_pending()
    time.sleep(30)

Dengan wrapper safe_run, kalau backup gagal, error-nya tetap ke-log dan scheduler tetap jalan buat task berikutnya.

APScheduler - Lebih Powerful

Kalau butuh fitur lebih advance (multiple threads, persistent jobs, misfire handling), APScheduler jawabannya. Library ini dipakai di production oleh banyak perusahaan.


pip install apscheduler

Contoh dasar dengan APScheduler:


from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def cek_disk_space():
    """Cek disk space server"""
    logger.info(f"[{datetime.now()}] Mengecek disk space...")

def sync_data():
    """Sync data dari API external"""
    logger.info(f"[{datetime.now()}] Sync data dimulai...")

def generate_report():
    """Generate laporan mingguan"""
    logger.info(f"[{datetime.now()}] Generate laporan...")

# Bikin scheduler
scheduler = BackgroundScheduler()

# Tambah job dengan interval
scheduler.add_job(
    cek_disk_space,
    IntervalTrigger(minutes=10),
    id='disk_check',
    name='Cek Disk Space',
    replace_existing=True
)

# Tambah job dengan cron
scheduler.add_job(
    sync_data,
    CronTrigger(hour=6, minute=0),  # Jam 6 pagi
    id='sync_data',
    name='Sync Data Harian'
)

scheduler.add_job(
    generate_report,
    CronTrigger(day_of_week='mon', hour=8),  # Senin jam 8
    id='weekly_report',
    name='Laporan Mingguan'
)

scheduler.start()
print(f"Scheduler aktif dengan {len(scheduler.get_jobs())} jobs")

# Biar script ga langsung keluar
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

Perbandingan Trigger di APScheduler

APScheduler punya tiga jenis trigger utama. Pilih sesuai kebutuhan:

  • IntervalTrigger: Jalankan tiap N detik/menit/jam. Cocok buat polling, health check, atau monitoring real-time
  • CronTrigger: Jadwal tetap seperti cron Linux (hari, jam, menit). Cocok buat backup harian, laporan mingguan
  • DateTrigger: Sekali jalan di waktu tertentu. Cocok buat one-time task kayak "kirim reminder besok jam 9"

from apscheduler.triggers import IntervalTrigger, CronTrigger, DateTrigger
from datetime import datetime, timedelta

# Contoh IntervalTrigger
scheduler.add_job(task, IntervalTrigger(
    hours=2,
    start_date='2026-06-17 08:00:00',
    end_date='2026-06-30 23:59:59'
))

# Contoh CronTrigger kompleks
scheduler.add_job(task, CronTrigger(
    day_of_week='mon-fri',  # Hari kerja aja
    hour='9,12,17',         # Jam 9, 12, 17
    timezone='Asia/Jakarta'
))

# Contoh DateTrigger
scheduler.add_job(task, DateTrigger(
    run_date=datetime.now() + timedelta(hours=1)
))

Integrasikan dengan Flask/FastAPI

Kalau kamu punya web app, APScheduler bisa jalan bareng framework. Ini contoh integrasi dengan Flask:


from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import atexit

app = Flask(__name__)

def cleanup_expired_sessions():
    """Hapus session yang expired"""
    print("Cleaning up expired sessions...")

def send_pending_notifications():
    """Kirim notifikasi yang pending"""
    print("Sending pending notifications...")

scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_expired_sessions, 'interval', hours=1)
scheduler.add_job(send_pending_notifications, 'interval', minutes=15)
scheduler.start()

# Shutdown scheduler saat app berhenti
atexit.register(lambda: scheduler.shutdown())

@app.route('/')
def index():
    return "App berjalan dengan scheduler!"

if __name__ == '__main__':
    app.run()

Untuk FastAPI, polanya mirip. Taruh scheduler di lifespan event:


from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    scheduler.add_job(my_task, 'interval', minutes=30)
    scheduler.start()
    yield
    # Shutdown
    scheduler.shutdown()

app = FastAPI(lifespan=lifespan)

Misfire Handling - Ketika Task Terlewat

Pernah server down terus task yang seharusnya jalan jam 2 pagi kelewat? APScheduler punya misfire_grace_time buat handle ini:


# Task tetap dijalankan kalau terlewat kurang dari 1 jam
scheduler.add_job(
    backup_database,
    CronTrigger(hour=2),
    misfire_grace_time=3600,  # 1 jam grace period
    coalesce=True,  # Kalau kelewat beberapa kali, cuma jalan sekali
    max_instances=1  # Ga boleh ada 2 instance jalan bareng
)

Parameter penting:

  • misfire_grace_time: Berapa detik toleransi kalau task terlewat. Default-nya 1 detik (terlalu ketat)
  • coalesce: Kalau True dan task kelewat berkali-kali, cuma jalan sekali. Hemat resource
  • max_instances: Batasi berapa instance task yang boleh jalan bareng. Prevent overlap

Job Store - Persistensi Jadwal

Secara default, semua job hilang kalau script restart. Biar jadwal tetap ada, pakai job store:


from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# Simpan jadwal di database
jobstores = {
    'default': SQLAlchemyJobStore(url='mysql+pymysql://appuser:PasswordKuat123!@localhost/kinara_project')
}

scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.start()

Dengan SQLAlchemy job store, semua jadwal tersimpan di database. Restart server? Jadwal tetap ada. Kamu juga bisa pakai MongoDB, Redis, atau memory (default).

Scheduling Email Otomatis

Ini use case yang sering saya temuin: kirim email reminder otomatis. Contoh lengkap:


import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

def kirim_email_reminder(penerima, subjek, pesan):
    """Kirim email via SMTP"""
    msg = MIMEMultipart()
    msg['From'] = '[email protected]'
    msg['To'] = penerima
    msg['Subject'] = subjek
    msg.attach(MIMEText(pesan, 'html'))
    
    try:
        with smtplib.SMTP('smtp.gmail.com', 587) as server:
            server.starttls()
            server.login('[email protected]', 'your_app_password')
            server.send_message(msg)
        print(f"Email terkirim ke {penerima}")
        return True
    except Exception as e:
        print(f"Gagal kirim email: {e}")
        return False

def reminder_bayar():
    """Kirim reminder pembayaran setiap tanggal 25"""
    data_client = [
        {"email": "[email protected]", "nama": "Budi", "tagihan": "500000"},
        {"email": "[email protected]", "nama": "Sari", "tagihan": "750000"},
    ]
    
    for client in data_client:
        html = f"""
        

Halo {client['nama']},

Reminder: Tagihan bulanan kamu sebesar Rp{client['tagihan']} jatuh tempo tanggal 1. Mohon segera diproses ya!

""" kirim_email_reminder(client['email'], "Reminder Pembayaran", html) scheduler = BackgroundScheduler() scheduler.add_job(reminder_bayar, CronTrigger(day=25, hour=9)) scheduler.start()

Database Cleanup Scheduler

Satu lagi contoh praktis: bersihkan data lama di database secara otomatis. Biasanya tabel log atau session expired numpuk kalau ga dibersihkan:


import mysql.connector
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

def cleanup_old_logs(hari_retention=30):
    """Hapus log yang lebih dari N hari"""
    conn = mysql.connector.connect(
        host='localhost',
        user='appuser',
        password='PasswordKuat123!',
        database='kinara_project'
    )
    cursor = conn.cursor()
    
    batas = datetime.now() - timedelta(days=hari_retention)
    
    # Hapus log lama
    cursor.execute(
        "DELETE FROM activity_logs WHERE created_at < %s",
        (batas,)
    )
    deleted = cursor.rowcount
    conn.commit()
    cursor.close()
    conn.close()
    
    logger.info(f"Cleanup selesai: {deleted} log lama dihapus")
    return deleted

def vacuum_analyze():
    """Optimasi tabel MySQL (ANALYZE TABLE)"""
    tables = ['blog_posting', 'activity_logs', 'sessions']
    conn = mysql.connector.connect(
        host='localhost',
        user='appuser',
        password='PasswordKuat123!',
        database='kinara_project'
    )
    cursor = conn.cursor()
    
    for table in tables:
        cursor.execute(f"ANALYZE TABLE {table}")
        result = cursor.fetchone()
        logger.info(f"ANALYZE {table}: {result[3]}")
    
    cursor.close()
    conn.close()

scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_old_logs, CronTrigger(hour=3, minute=0), args=[30])
scheduler.add_job(vacuum_analyze, CronTrigger(day_of_week='sun', hour=4))
scheduler.start()

Monitoring Scheduler - Cek Status Job

Jangan pasang scheduler terus dilupain. Tambahin monitoring biar tau statusnya:


from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

scheduler = BackgroundScheduler()
# ... tambah jobs ...

def status_report():
    """Cetak status semua job"""
    jobs = scheduler.get_jobs()
    print(f"\n=== Scheduler Status ({datetime.now()}) ===")
    print(f"Total jobs: {len(jobs)}")
    for job in jobs:
        print(f"  - {job.name} (ID: {job.id})")
        print(f"    Next run: {job.next_run_time}")
        print(f"    Trigger: {job.trigger}")
    print("=" * 40)

# Jalankan status report setiap jam
scheduler.add_job(status_report, IntervalTrigger(hours=1))

Kamu juga bisa expose endpoint di Flask/FastAPI buat cek status via HTTP:


@app.route('/scheduler/status')
def scheduler_status():
    jobs = []
    for job in scheduler.get_jobs():
        jobs.append({
            'id': job.id,
            'name': job.name,
            'next_run': str(job.next_run_time),
            'trigger': str(job.trigger)
        })
    return {'status': 'running', 'jobs': jobs, 'count': len(jobs)}

Kapan Pakai Schedule vs APScheduler?

Pilih berdasarkan kebutuhan:

  • Pakai schedule kalau: Script sederhana, satu mesin, ga butuh persistensi, mau cepat setup. Cocok buat cron job replacement yang ringan
  • Pakai APScheduler kalau: Butuh multi-threading, persistent jobs (survive restart), integrasi web framework, misfire handling, atau monitoring via API. Cocok buat production
  • Tetap pakai cron Linux kalau: Task-nya super simpel, ga butuh Python logic, dan server-nya dedicated Linux

Tips dari Pengalaman

Beberapa hal yang saya pelajari dari pakai task scheduler di production:

  • Always add logging: Tanpa log, debugging scheduler itu mimpi buruk. Pakai logging module, bukan print()
  • Handle timezone: Kalau server di UTC tapi target WIB, pastikan set timezone di scheduler. APScheduler support timezone='Asia/Jakarta'
  • Jangan lupa shutdown: Scheduler yang ga di-shutdown bisa bikin zombie process. Pakai atexit.register() atau context manager
  • Test with shorter intervals: Pas development, pakai interval pendek (1 menit) buat testing. Jangan nunggu jam 2 pagi buat verify backup works
  • Alert on failure: Integrasikan dengan Telegram bot atau email buat notifikasi kalau task gagal. Saya pakai requests ke Telegram API

Penutup

Task scheduler di Python itu powerful banget buat automasi. Mulai dari backup database, cleanup log, kirim email reminder, sampai sync data - semua bisa dijadwalkan otomatis. Mulai dari schedule buat yang simpel, naik ke APScheduler kalau butuh fitur production-grade.

Yang penting: jangan lupa error handling dan monitoring. Scheduler yang gagal diam-diam itu lebih bahaya dari ga ada scheduler sama sekali.


You may also like


0 Comments


Leave a Reply

Comments with links or spam keywords will be rejected.
Scroll to Top