Source
id: advanced-scheduling
namespace: company.team
inputs:
- id: country
type: STRING
defaults: US
- id: date
type: DATETIME
required: false
defaults: 2023-12-24T14:00:00.000Z
tasks:
- id: check_if_business_date
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
namespaceFiles:
enabled: true
commands:
- python schedule.py "{{trigger.date ?? inputs.date}}" {{inputs.country}}
beforeCommands:
- pip install workalendar
taskRunner:
type: io.kestra.plugin.core.runner.Process
- id: log
type: io.kestra.plugin.core.log.Log
message: business day - continuing the flow...
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 14 25 12 *
About this blueprint
Namespace Files Trigger Python Schedule
This flow will run one or more tasks only on business days for a specific country. The Python script in this flow checks if the date is a business day for the specified country. If it is, the flow continues to the next task. If it is not, the task fails, blocking the execution of subsequent tasks.
The Pebble expression "{{trigger.date ?? inputs.date}}"
will make sure that the flow will use either the schedule date from the trigger or the date provided on the date
input at runtime. You can use the inputs to test the logic.
Make sure adjust the Python script to match your desired country.
To add the Python script, go to the VS Code Editor in the Kestra UI and add the script schedule.py
:
from datetime import datetime from workalendar.europe import France # Import calendars for specific countries from workalendar.usa import UnitedStates # Example for another country
def is_business_day(date_str, country_calendar):
# Remove 'Z' from kestra's timezone-specific timestamp and parse the date string
date = datetime.fromisoformat(date_str.replace("Z", ""))
# Check if the date is a business day
return country_calendar.is_working_day(date)
def main():
if len(sys.argv) != 3:
print("Usage: script.py <date> <country_code>")
sys.exit(1)
date_str, country_code = sys.argv[1], sys.argv[2]
# Dictionary mapping country codes to calendar objects
calendars = {
"FR": France(),
"US": UnitedStates(),
}
country_calendar = calendars.get(country_code.upper())
if not country_calendar:
print(f"Calendar for '{country_code}' not supported or not found.")
sys.exit(1)
try:
if not is_business_day(date_str, country_calendar):
print(f"{date_str} is not a business day in {country_code}.")
sys.exit(1)
else:
print(f"{date_str} is a business day in {country_code}.")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
This way, you can store and manage your custom scripts using Namespace Files rather than pasting them inline in YAML.