Schema Config
Pipeline
Column |
Data Type |
PK |
Description |
pipeline_name |
STRING |
Y |
Pipeline name |
set_date |
INTEGER |
|
A number for decrease process date before running |
frequency |
STRING |
|
'D', 'W', 'M', 'Q', 'Y' |
data_frequency |
STRING |
|
'D', 'W', 'M', 'Q', 'Y' |
active_flag |
BOOLEAN |
|
Active flag for this config data |
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Node Group
Column |
Data Type |
PK |
Description |
node_group_name |
STRING |
Y |
Node process group name |
pipeline_name |
STRING |
|
|
priority |
INTEGER |
|
|
active_flag |
BOOLEAN |
|
Active flag for this config data |
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Node
Column |
Data Type |
PK |
Description |
node_name |
STRING |
Y |
Node process name |
node_group_name |
STRING |
|
|
priority |
INTEGER |
|
|
load_type |
STRING |
|
|
data_load_type |
STRING |
|
T, D, F, SCD1, SCD2_D, SCD2_F, SCD2_T |
extras |
JSON |
|
|
active_flag |
BOOLEAN |
|
Active flag for this config data |
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Node Dependency
Column |
Data Type |
PK |
Description |
node_name |
STRING |
Y |
Node process name |
node_dependency_name |
STRING |
Y |
Node process dependency name |
dependency_set_date |
INTEGER |
|
A number for decrease dependency process date before running |
active_flag |
BOOLEAN |
|
Active flag for this config data |
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Note
The extras
field on the node table contain all of necessary node arguments;
extras
├─ files
│ ├─ name
│ ├─ path
│ ├─ header
│ ╰─ encoding
├─ storage
│ ├─ system
│ ╰─ container
╰─ framwork
├─ archiving
╰─ timeout
Schema Data
Pipeline Watermark (Merge)
Column |
Data Type |
PK |
Description |
pipeline_name |
STRING |
Y |
Pipeline name |
process_date |
DATETIME |
|
Process datetime that be the latest process running |
next_process_date |
DATETIME |
|
|
previous_process_date |
DATETIME |
|
|
data_date |
DATETIME |
|
|
next_data_date |
DATETIME |
|
|
previous_data_date |
DATETIME |
|
|
running_flag |
BOOLEAN |
|
A running flag for tacking this pipeline is running or not |
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Pipeline Logging (Append Only)
Column |
Data Type |
PK |
Description |
pipeline_name |
STRING |
Y |
Pipeline name |
process_id |
INTEGER |
Y |
Process ID |
orchestrate_id |
INTEGER |
Y |
Orchestration tools running id |
process_ts |
TIMESTAMP |
|
|
process_date |
DATETIME |
|
|
status |
STRING |
|
'Success', 'Failed', 'Start' |
log |
STRING |
|
|
tracking |
JSON |
|
|
update_stage |
STRING |
|
|
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Node Logging (Append Only)
Column |
Data Type |
PK |
Description |
node_name |
STRING |
Y |
Node process name |
process_id |
INTEGER |
Y |
Process ID |
orchestrate_id |
INTEGER |
Y |
Orchestration tools running id |
process_ts |
TIMESTAMP |
|
Process timestamp |
process_date |
DATETIME |
|
Process datetime |
status |
STRING |
|
'Success', 'Failed', 'Start' |
log |
STRING |
|
|
records |
JSON |
|
|
update_stage |
STRING |
|
|
update_by |
STRING |
|
Who that add or update this config data |
update_date |
DATETIME |
|
Update datetime of this config data |
Note
The records
field on the node log table contain all of necessary result records;
records
├─ count
╰─ control_count
If you get the log records
that not null and grouping its by update_stage
,
it will be tracing record like;
records
├─ src
│ ├─ count
│ ╰─ control_count
├─ stg ...
├─ stg ...
╰─ tgt
├─ count
╰─ control_count
Query
SELECT
pipe.pipeline_name AS pipeline_name
, node_group.node_group_name AS node_group_name
, node.*
FROM control.control_pipeline AS pipe
JOIN control.control_node_group AS node_group
ON pipe.pipeline_name = node_group.node_group_name
JOIN control.control_node AS node
ON node_group.node_name = node.node_name
WHERE
pipe.active_flag is true
AND node_group.active_flag is true
AND node.active_flag is true
Node log
SELECT
node_name
, process_id
, orchestrate_id
, MIN(process_date) AS start_date
, MAX(process_date) AS end_date
FROM control.log_node
GROUP BY
node_name
, process_id
, orchestrate_id
Node dependency needed
SELECT
node_deps.node_dependency_name AS node_deps_name
, CASE WHEN MAX(COALESCE(log.process_id, -999)) = -999 THEN 'Need'
ELSE 'Pass'
END AS latest_process_id
FROM control.control_node AS node
INNER JOIN control.control_node_dependency AS node_deps
ON node.node_name = node_deps.node_name
LEFT JOIN control.log_node AS log
ON log.node_name = node_deps.node_dependency_name
WHERE
log.status = 'Success'
AND '{process-date}' = DATEADD(DAY, node_deps.dependency_set_date, log.process_date)
GROUP BY node_deps.node_dependency_name