Source
yaml
id: file-processing
namespace: tutorial
description: File Processing
variables:
file_id: 202403
tasks:
- id: get_zipfile
type: io.kestra.plugin.core.http.Download
uri: https://divvy-tripdata.s3.amazonaws.com/{{ render(vars.file_id)
}}-divvy-tripdata.zip
- id: unzip
type: io.kestra.plugin.compress.ArchiveDecompress
algorithm: ZIP
from: "{{ outputs.get_zipfile.uri }}"
- id: csv_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{outputs.unzip.files[render(vars.file_id) ~ '-divvy-tripdata.csv']}}"
- id: to_parquet
type: io.kestra.plugin.serdes.avro.IonToAvro
from: "{{ outputs.csv_to_ion.uri }}"
datetimeFormat: yyyy-MM-dd' 'HH:mm:ss
schema: >
{
"type": "record",
"name": "Ride",
"namespace": "com.example.bikeshare",
"fields": [
{"name": "ride_id", "type": "string"},
{"name": "rideable_type", "type": "string"},
{"name": "started_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "ended_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "start_station_name", "type": "string"},
{"name": "start_station_id", "type": "string"},
{"name": "end_station_name", "type": "string"},
{"name": "end_station_id", "type": "string"},
{"name": "start_lat", "type": "double"},
{"name": "start_lng", "type": "double"},
{
"name": "end_lat",
"type": ["null", "double"],
"default": null
},
{
"name": "end_lng",
"type": ["null", "double"],
"default": null
},
{"name": "member_casual", "type": "string"}
]
}
About this blueprint
Getting Started Software Engineering API
This flow is a simple example of a file processing use case. It downloads a zip file, unzips it, reads a CSV file, and writes the data to a Parquet file, ensuring that the data follows a specific schema. The flow has four tasks: 1. The first task downloads a zip file. 2. The second task unzips the file. 3. The third task reads the CSV file. 4. The fourth task writes the data to a Parquet file.
More Related Blueprints