Logical Relations¶
Read Operator¶
The read operator is an operator that produces one output. A simple example would be the reading of a Parquet file. It is expected that many types of reads will be added over time.
Signature | Value |
---|---|
Inputs | 0 |
Outputs | 1 |
Property Maintenance | N/A (no inputs) |
Direct Output Order | Defaults to the schema of the data read after the optional projection (masked complex expression) is applied. |
Read Properties¶
Property | Description | Required |
---|---|---|
Definition | The contents of the read property definition. | Required |
Direct Schema | Defines the schema of the output of the read (before any projection or emit remapping/hiding). | Required |
Filter | A boolean Substrait expression that describes a filter that must be applied to the data. The filter should be interpreted against the direct schema. | Optional, defaults to none. |
Best Effort Filter | A boolean Substrait expression that describes a filter that may be applied to the data. The filter should be interpreted against the direct schema. | Optional, defaults to none. |
Projection | A masked complex expression describing the portions of the content that should be read | Optional, defaults to all of schema |
Output Properties | Declaration of orderedness and/or distribution properties this read produces. | Optional, defaults to no properties. |
Properties | A list of name/value pairs associated with the read. | Optional, defaults to empty |
Read Filtering¶
The read relation has two different filter properties. A filter, which must be satisfied by the operator and a best effort filter, which does not have to be satisfied. This reflects the way that consumers are often implemented. A consumer is often only able to fully apply a limited set of operations in the scan. There can then be an extended set of operations which a consumer can apply in a best effort fashion. A producer, when setting these two fields, should take care to only use expressions that the consumer is capable of handling.
As an example, a consumer may only be able to fully apply (in the read relation) <, =, and > on integral types. The consumer may be able to apply <, =, and > in a best effort fashion on decimal and string types. Consider the filter expression my_int < 10 && my_string < "x" && upper(my_string) > "B"
. In this case the filter
should be set to my_int < 10
and the best_effort_filter
should be set to my_string < "x"
and the remaining portion (upper(my_string) > "B"
) should be put into a filter relation.
A filter expression must be interpreted against the direct schema before the projection expression has been applied. As a result, fields may be referenced by the filter expression which are not included in the relation’s output.
Read Definition Types¶
Adding new Read Definition Types
If you have a read definition that’s not covered here, see the process for adding new read definition types.
Read definition types (like the rest of the features in Substrait) are built by the community and added to the specification.
Virtual Table¶
A virtual table is a table whose contents are embedded in the plan itself. The table data is encoded as records consisting of literal values or expressions that can be resolved without referencing any input data. For example, a literal, a function call involving literals, or any other expression that does not require input.
Property | Description | Required |
---|---|---|
Data | Required | Required |
Named Table¶
A named table is a reference to data defined elsewhere. For example, there may be a catalog of tables with unique names that both the producer and consumer agree on. This catalog would provide the consumer with more information on how to retrieve the data.
Property | Description | Required |
---|---|---|
Names | A list of namespaced strings that, together, form the table name | Required (at least one) |
Files Type¶
Property | Description | Required |
---|---|---|
Items | An array of Items (path or path glob) associated with the read. | Required |
Format per item | Enumeration of available formats. Only current option is PARQUET. | Required |
Slicing parameters per item | Information to use when reading a slice of a file. | Optional |
Slicing Files¶
A read operation is allowed to only read part of a file. This is convenient, for example, when distributing a read operation across several nodes. The slicing parameters are specified as byte offsets into the file.
Many file formats consist of indivisible “chunks” of data (e.g. Parquet row groups). If this happens the consumer can determine which slice a particular chunk belongs to. For example, one possible approach is that a chunk should only be read if the midpoint of the chunk (dividing by 2 and rounding down) is contained within the asked-for byte range.
message ReadRel {
RelCommon common = 1;
NamedStruct base_schema = 2;
Expression filter = 3;
Expression best_effort_filter = 11;
Expression.MaskExpression projection = 4;
substrait.extensions.AdvancedExtension advanced_extension = 10;
// Definition of which type of scan operation is to be performed
oneof read_type {
VirtualTable virtual_table = 5;
LocalFiles local_files = 6;
NamedTable named_table = 7;
ExtensionTable extension_table = 8;
}
// A base table. The list of string is used to represent namespacing (e.g., mydb.mytable).
// This assumes shared catalog between systems exchanging a message.
message NamedTable {
repeated string names = 1;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
// A table composed of expressions.
message VirtualTable {
repeated Expression.Literal.Struct values = 1 [deprecated = true];
repeated Expression.Nested.Struct expressions = 2;
}
// A stub type that can be used to extend/introduce new table types outside
// the specification.
message ExtensionTable {
google.protobuf.Any detail = 1;
}
// Represents a list of files in input of a scan operation
message LocalFiles {
repeated FileOrFiles items = 1;
substrait.extensions.AdvancedExtension advanced_extension = 10;
// Many files consist of indivisible chunks (e.g. parquet row groups
// or CSV rows). If a slice partially selects an indivisible chunk
// then the consumer should employ some rule to decide which slice to
// include the chunk in (e.g. include it in the slice that contains
// the midpoint of the chunk)
message FileOrFiles {
oneof path_type {
// A URI that can refer to either a single folder or a single file
string uri_path = 1;
// A URI where the path portion is a glob expression that can
// identify zero or more paths.
// Consumers should support the POSIX syntax. The recursive
// globstar (**) may not be supported.
string uri_path_glob = 2;
// A URI that refers to a single file
string uri_file = 3;
// A URI that refers to a single folder
string uri_folder = 4;
}
// Original file format enum, superseded by the file_format oneof.
reserved 5;
reserved "format";
// The index of the partition this item belongs to
uint64 partition_index = 6;
// The start position in byte to read from this item
uint64 start = 7;
// The length in byte to read from this item
uint64 length = 8;
message ParquetReadOptions {}
message ArrowReadOptions {}
message OrcReadOptions {}
message DwrfReadOptions {}
message DelimiterSeparatedTextReadOptions {
// Delimiter separated files may be compressed. The reader should
// autodetect this and decompress as needed.
// The character(s) used to separate fields. Common values are comma,
// tab, and pipe. Multiple characters are allowed.
string field_delimiter = 1;
// The maximum number of bytes to read from a single line. If a line
// exceeds this limit the resulting behavior is undefined.
uint64 max_line_size = 2;
// The character(s) used to quote strings. Common values are single
// and double quotation marks.
string quote = 3;
// The number of lines to skip at the beginning of the file.
uint64 header_lines_to_skip = 4;
// The character used to escape characters in strings. Backslash is
// a common value. Note that a double quote mark can also be used as an
// escape character but the external quotes should be removed first.
string escape = 5;
// If this value is encountered (including empty string), the resulting
// value is null instead. Leave unset to disable. If this value is
// provided, the effective schema of this file is comprised entirely of
// nullable strings. If not provided, the effective schema is instead
// made up of non-nullable strings.
optional string value_treated_as_null = 6;
}
// The format of the files along with options for reading those files.
oneof file_format {
ParquetReadOptions parquet = 9;
ArrowReadOptions arrow = 10;
OrcReadOptions orc = 11;
google.protobuf.Any extension = 12;
DwrfReadOptions dwrf = 13;
DelimiterSeparatedTextReadOptions text = 14;
}
}
}
}
Filter Operation¶
The filter operator eliminates one or more records from the input data based on a boolean filter expression.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Orderedness, Distribution, remapped by emit |
Direct Output Order | The field order as the input. |
Filter Properties¶
Property | Description | Required |
---|---|---|
Input | The relational input. | Required |
Expression | A boolean expression which describes which records are included/excluded. | Required |
message FilterRel {
RelCommon common = 1;
Rel input = 2;
Expression condition = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
Sort Operation¶
The sort operator reorders a dataset based on one or more identified sort fields and a sorting function for each.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Will update orderedness property to the output of the sort operation. Distribution property only remapped based on emit. |
Direct Output Order | The field order of the input. |
Sort Properties¶
Property | Description | Required |
---|---|---|
Input | The relational input. | Required |
Sort Fields | List of one or more fields to sort by. Uses the same properties as the orderedness property. | One sort field required |
message SortRel {
RelCommon common = 1;
Rel input = 2;
repeated SortField sorts = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
Project Operation¶
The project operation will produce one or more additional expressions based on the inputs of the dataset.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Distribution maintained, mapped by emit. Orderedness: Maintained if no window operations. Extended to include projection fields if fields are direct references. If window operations are present, no orderedness is maintained. |
Direct Output Order | The field order of the input + the list of new expressions in the order they are declared in the expressions list. |
Project Properties¶
Property | Description | Required |
---|---|---|
Input | The relational input. | Required |
Expressions | List of one or more expressions to add to the input. | At least one expression required |
message ProjectRel {
RelCommon common = 1;
Rel input = 2;
repeated Expression expressions = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
Cross Product Operation¶
The cross product operation will combine two separate inputs into a single output. It pairs every record from the left input with every record of the right input.
Signature | Value |
---|---|
Inputs | 2 |
Outputs | 1 |
Property Maintenance | Distribution is maintained. Orderedness is empty post operation. |
Direct Output Order | The emit order of the left input followed by the emit order of the right input. |
Cross Product Properties¶
Property | Description | Required |
---|---|---|
Left Input | A relational input. | Required |
Right Input | A relational input. | Required |
message CrossRel {
RelCommon common = 1;
Rel left = 2;
Rel right = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
Join Operation¶
The join operation will combine two separate inputs into a single output, based on a join expression. A common subtype of joins is an equality join where the join expression is constrained to a list of equality (or equality + null equality) conditions between the two inputs of the join.
Signature | Value |
---|---|
Inputs | 2 |
Outputs | 1 |
Property Maintenance | Distribution is maintained. Orderedness is empty post operation. Physical relations may provide better property maintenance. |
Direct Output Order | The emit order of the left input followed by the emit order of the right input. |
Join Properties¶
Property | Description | Required |
---|---|---|
Left Input | A relational input. | Required |
Right Input | A relational input. | Required |
Join Expression | A boolean condition that describes whether each record from the left set “match” the record from the right set. Field references correspond to the direct output order of the data. | Required. Can be the literal True. |
Post-Join Filter | A boolean condition to be applied to each result record after the inputs have been joined, yielding only the records that satisfied the condition. | Optional |
Join Type | One of the join types defined below. | Required |
Join Types¶
Type | Description |
---|---|
Inner | Return records from the left side only if they match the right side. Return records from the right side only when they match the left side. For each cross input match, return a record including the data from both sides. Non-matching records are ignored. |
Outer | Return all records from both the left and right inputs. For each cross input match, return a record including the data from both sides. For any remaining non-match records, return the record from the corresponding input along with nulls for the opposite input. |
Left | Return all records from the left input. For each cross input match, return a record including the data from both sides. For any remaining non-matching records from the left input, return the left record along with nulls for the right input. |
Right | Return all records from the right input. For each cross input match, return a record including the data from both sides. For any remaining non-matching records from the right input, return the right record along with nulls for the left input. |
Left Semi | Returns records from the left input. These are returned only if the records have a join partner on the right side. |
Right Semi | Returns records from the right input. These are returned only if the records have a join partner on the left side. |
Left Anti | Return records from the left input. These are returned only if the records do not have a join partner on the right side. |
Right Anti | Return records from the right input. These are returned only if the records do not have a join partner on the left side. |
Left Single | Return all records from the left input with no join expansion. If at least one record from the right input matches the left, return one arbitrary matching record from the right input. For any left records without matching right records, return the left record along with nulls for the right input. Similar to a left outer join but only returns one right match at most. Useful for nested sub-queries where we need exactly one record in output (or throw exception). See Section 3.2 of https://15721.courses.cs.cmu.edu/spring2018/papers/16-optimizer2/hyperjoins-btw2017.pdf for more information. |
Right Single | Same as left single except that the right and left inputs are switched. |
Left Mark | Returns one record for each record from the left input. Appends one additional “mark” column to the output of the join. The new column will be listed after all columns from both sides and will be of type nullable boolean. If there is at least one join partner in the right input where the join condition evaluates to true then the mark column will be set to true. Otherwise, if there is at least one join partner in the right input where the join condition evaluates to NULL then the mark column will be set to NULL. Otherwise the mark column will be set to false. |
Right Mark | Returns records from the right input. Appends one additional “mark” column to the output of the join. The new column will be listed after all columns from both sides and will be of type nullable boolean. If there is at least one join partner in the left input where the join condition evaluates to true then the mark column will be set to true. Otherwise, if there is at least one join partner in the left input where the join condition evaluates to NULL then the mark column will be set to NULL. Otherwise the mark column will be set to false. |
message JoinRel {
RelCommon common = 1;
Rel left = 2;
Rel right = 3;
Expression expression = 4;
Expression post_join_filter = 5;
JoinType type = 6;
enum JoinType {
JOIN_TYPE_UNSPECIFIED = 0;
JOIN_TYPE_INNER = 1;
JOIN_TYPE_OUTER = 2;
JOIN_TYPE_LEFT = 3;
JOIN_TYPE_RIGHT = 4;
JOIN_TYPE_LEFT_SEMI = 5;
JOIN_TYPE_LEFT_ANTI = 6;
JOIN_TYPE_LEFT_SINGLE = 7;
JOIN_TYPE_RIGHT_SEMI = 8;
JOIN_TYPE_RIGHT_ANTI = 9;
JOIN_TYPE_RIGHT_SINGLE = 10;
JOIN_TYPE_LEFT_MARK = 11;
JOIN_TYPE_RIGHT_MARK = 12;
}
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
Set Operation¶
The set operation encompasses several set-level operations that support combining datasets, possibly excluding records based on various types of record level matching.
Signature | Value |
---|---|
Inputs | 2 or more |
Outputs | 1 |
Property Maintenance | Maintains distribution if all inputs have the same ordinal distribution. Orderedness is not maintained. |
Direct Output Order | The field order of the inputs. All inputs must have identical field types, but field nullabilities may vary. |
Set Properties¶
Property | Description | Required |
---|---|---|
Primary Input | The primary input of the dataset. | Required |
Secondary Inputs | One or more relational inputs. | At least one required |
Set Operation Type | From list below. | Required |
Set Operation Types¶
The set operation type determines both the records that are emitted and the type of the output record.
For some set operations, whether a specific record is included in the output and if it appears more than once depends on the number of times it occurs across all inputs. In the following table, treat: * m: the number of time a records occurs in the primary input (p) * n1: the number of times a record occurs in the 1st secondary input (s1) * n2: the number of times a record occurs in the 2nd secondary input (s2) * … * n: the number of times a record occurs in the nth secondary input
Operation | Description | Examples | Output Nullability |
---|---|---|---|
Minus (Primary) | Returns all records from the primary input excluding any matching rows from secondary inputs, removing duplicates. Each value is treated as a unique member of the set, so duplicates in the first set don’t affect the result. This operation maps to SQL EXCEPT DISTINCT. | MINUS p: {1, 2, 2, 3, 3, 3, 4} s1: {1, 2} s2: {3} YIELDS {4} | The same as the primary input. |
Minus (Primary All) | Returns all records from the primary input excluding any matching records from secondary inputs. For each specific record returned, the output contains max(0, m - sum(n1, n2, …, n)) copies. This operation maps to SQL EXCEPT ALL. | MINUS ALL p: {1, 2, 2, 3, 3, 3, 3} s1: {1, 2, 3, 4} s2: {3} YIELDS {2, 3, 3} | The same as the primary input. |
Minus (Multiset) | Returns all records from the primary input excluding any records that are included in all secondary inputs. This operation does not have a direct SQL mapping. | MINUS MULTISET p: {1, 2, 3, 4} s1: {1, 2} s2: {1, 2, 3} YIELDS {3, 4} | The same as the primary input. |
Intersection (Primary) | Returns all records from the primary input that are present in any secondary input, removing duplicates. This operation does not have a direct SQL mapping. | INTERSECT p: {1, 2, 2, 3, 3, 3, 4} s1: {1, 2, 3, 5} s2: {2, 3, 6} YIELDS {1, 2, 3} | If a field is nullable in the primary input and in any of the secondary inputs, it is nullable in the output. |
Intersection (Multiset) | Returns all records from the primary input that match at least one record from all secondary inputs. This operation maps to SQL INTERSECT DISTINCT | INTERSECT MULTISET p: {1, 2, 3, 4} s1: {2, 3} s2: {3, 4} YIELDS {3} | If a field is required in any of the inputs, it is required in the output. |
Intersection (Multiset All) | Returns all records from the primary input that are present in every secondary input. For each specific record returned, the output contains min(m, n1, n2, …, n) copies. This operation maps to SQL INTERSECT ALL. | INTERSECT ALL p: {1, 2, 2, 3, 3, 3, 4} s1: {1, 2, 3, 3, 5} s2: {2, 3, 3, 6} YIELDS {2, 3, 3} | If a field is required in any of the inputs, it is required in the output. |
Union Distinct | Returns all records from each set, removing duplicates. This operation maps to SQL UNION DISTINCT. | UNION p: {1, 2, 2, 3, 3, 3, 4} s1: {2, 3, 5} s2: {1, 6} YIELDS {1, 2, 3, 4, 5, 6} | If a field is nullable in any of the inputs, it is nullable in the output. |
Union All | Returns all records from all inputs. For each specific record returned, the output contains (m + n1 + n2 + … + n) copies. This operation maps to SQL UNION ALL. | UNION ALL p: {1, 2, 2, 3, 3, 3, 4} s1: {2, 3, 5} s2: {1, 6} YIELDS {1, 2, 2, 3, 3, 3, 4, 2, 3, 5, 1, 6} | If a field is nullable in any of the inputs, it is nullable in the output. |
Note that for set operations, NULL matches NULL. That is
{NULL, 1, 3} MINUS {NULL, 2, 4} === (1), (3)
{NULL, 1, 3} INTERSECTION {NULL, 2, 3} === (NULL)
{NULL, 1, 3} UNION DISTINCT {NULL, 2, 4} === (NULL), (1), (2), (3), (4)
Output Type Derivation Examples¶
Given the following inputs, where R is Required and N is Nullable:
Input 1: (R, R, R, R, N, N, N, N) Primary Input
Input 2: (R, R, N, N, R, R, N, N) Secondary Input
Input 3: (R, N, R, N, R, N, R, N) Secondary Input
The output type is as follows for the various operations
Property | Output Type |
---|---|
Minus (Primary) | (R, R, R, R, N, N, N, N) |
Minus (Primary All) | (R, R, R, R, N, N, N, N) |
Minus (Multiset) | (R, R, R, R, N, N, N, N) |
Intersection (Primary) | (R, R, R, R, R, N, N, N) |
Intersection (Multiset) | (R, R, R, R, R, R, R, N) |
Intersection (Multiset All) | (R, R, R, R, R, R, R, N) |
Union Distinct | (R, N, N, N, N, N, N, N) |
Union All | (R, N, N, N, N, N, N, N) |
message SetRel {
RelCommon common = 1;
// The first input is the primary input, the remaining are secondary
// inputs. There must be at least two inputs.
repeated Rel inputs = 2;
SetOp op = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
enum SetOp {
SET_OP_UNSPECIFIED = 0;
SET_OP_MINUS_PRIMARY = 1;
SET_OP_MINUS_PRIMARY_ALL = 7;
SET_OP_MINUS_MULTISET = 2;
SET_OP_INTERSECTION_PRIMARY = 3;
SET_OP_INTERSECTION_MULTISET = 4;
SET_OP_INTERSECTION_MULTISET_ALL = 8;
SET_OP_UNION_DISTINCT = 5;
SET_OP_UNION_ALL = 6;
}
}
Fetch Operation¶
The fetch operation eliminates records outside a desired window. Typically corresponds to a fetch/offset SQL clause. Will only returns records between the start offset and the end offset.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Maintains distribution and orderedness. |
Direct Output Order | Unchanged from input. |
Fetch Properties¶
Property | Description | Required |
---|---|---|
Input | A relational input, typically with a desired orderedness property. | Required |
Offset | A non-negative integer. Declares the offset for retrieval of records. | Optional, defaults to 0. |
Count | A non-negative integer or -1. Declares the number of records that should be returned. -1 signals that ALL records should be returned. | Required |
message FetchRel {
RelCommon common = 1;
Rel input = 2;
// the offset expressed in number of records
int64 offset = 3;
// the amount of records to return
// use -1 to signal that ALL records should be returned
int64 count = 4;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
Aggregate Operation¶
The aggregate operation groups input data on one or more sets of grouping keys, calculating each measure for each combination of grouping key.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Maintains distribution if all distribution fields are contained in every grouping set. No orderedness guaranteed. |
Direct Output Order | The list of grouping expressions in declaration order followed by the list of measures in declaration order, followed by an i32 describing the associated particular grouping set the value is derived from (if applicable). |
In its simplest form, an aggregation has only measures. In this case, all records are folded into one, and a column is returned for each aggregate expression in the measures list.
Grouping sets can be used for finer-grained control over which records are folded. A grouping set consists of zero or more references to the list of grouping expressions. Within a grouping set, two records will be folded together if and only if they have the same values for each of the expressions in the grouping set. The values returned by the grouping expressions will be returned as columns to the left of the columns for the aggregate expressions. Each of the grouping expressions must occur in at least one of the grouping sets. If a grouping set contains no grouping expressions, all rows will be folded for that grouping set. (Having a single grouping set with no grouping expressions is thus equivalent to not having any grouping sets.)
It is possible to specify multiple grouping sets in a single aggregate operation. The grouping sets behave more or less independently, with each returned record belonging to one of the grouping sets. The values for the grouping expression columns that are not part of the grouping set for a particular record will be set to null. The columns for grouping expressions that do not appear in all grouping sets will be nullable (regardless of the nullability of the type returned by the grouping expression) to accomodate the null insertion.
To further disambiguate which record belongs to which grouping set, an aggregate relation with more than one grouping set receives an extra i32
column on the right-hand side. The value of this field will be the zero-based index of the grouping set that yielded the record.
If at least one grouping expression is present, the aggregation is allowed to not have any aggregate expressions. An aggregate relation is invalid if it would yield zero columns.
Aggregate Properties¶
Property | Description | Required |
---|---|---|
Input | The relational input. | Required |
Grouping Sets | One or more grouping sets. | Optional, required if no measures. |
Per Grouping Set | A list of expression grouping that the aggregation measured should be calculated for. | Optional. |
Measures | A list of one or more aggregate expressions along with an optional filter. | Optional, required if no grouping sets. |
message AggregateRel {
RelCommon common = 1;
// Input of the aggregation
Rel input = 2;
// A list of zero or more grouping sets that the aggregation measures should
// be calculated for. There must be at least one grouping set if there are no
// measures (but it can be the empty grouping set).
repeated Grouping groupings = 3;
// A list of one or more aggregate expressions along with an optional filter.
// Required if there are no groupings.
repeated Measure measures = 4;
// A list of zero or more grouping expressions that grouping sets (i.e.,
// `Grouping` messages in the `groupings` field) can reference. Each
// expression in this list must be referred to by at least one
// `Grouping.expression_references`.
repeated Expression grouping_expressions = 5;
substrait.extensions.AdvancedExtension advanced_extension = 10;
message Grouping {
// Deprecated in favor of `expression_references` below.
repeated Expression grouping_expressions = 1 [deprecated = true];
// A list of zero or more references to grouping expressions, i.e., indices
// into the `grouping_expression` list.
repeated uint32 expression_references = 2;
}
message Measure {
AggregateFunction measure = 1;
// An optional boolean expression that acts to filter which records are
// included in the measure. True means include this record for calculation
// within the measure.
// Helps to support SUM(<c>) FILTER(WHERE...) syntax without masking opportunities for optimization
Expression filter = 2;
}
}
Reference Operator¶
The reference operator is used to construct DAGs of operations. In a Plan
we can have multiple Rel representing various computations with potentially multiple outputs. The ReferenceRel
is used to express the fact that multiple Rel
might be sharing subtrees of computation. This can be used to express arbitrary DAGs as well as represent multi-query optimizations.
As a concrete example think about two queries SELECT * FROM A JOIN B JOIN C
and SELECT * FROM A JOIN B JOIN D
, We could use the ReferenceRel
to highlight the shared A JOIN B
between the two queries, by creating a plan with 3 Rel
. One expressing A JOIN B
(in position 0 in the plan), one using reference as follows: ReferenceRel(0) JOIN C
and a third one doing ReferenceRel(0) JOIN D
. This allows to avoid the redundancy of A JOIN B
.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Maintains all properties of the input |
Direct Output Order | Maintains order |
Reference Properties¶
Property | Description | Required |
---|---|---|
Referred Rel | A zero-indexed positional reference to a Rel defined within the same Plan . | Required |
message ReferenceRel {
int32 subtree_ordinal = 1;
}
Write Operator¶
The write operator is an operator that consumes one input and writes it to storage. This can range from writing to a Parquet file, to INSERT/DELETE/UPDATE in a database.
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 1 |
Property Maintenance | Output depends on OutputMode (none, or modified records) |
Direct Output Order | Unchanged from input |
Write Properties¶
Property | Description | Required |
---|---|---|
Write Type | Definition of which object we are operating on (e.g., a fully-qualified table name). | Required |
CTAS Schema | The names of all the columns and their type for a CREATE TABLE AS. | Required only for CTAS |
Write Operator | Which type of operation we are performing (INSERT/DELETE/UPDATE/CTAS). | Required |
Rel Input | The Rel representing which records we will be operating on (e.g., VALUES for an INSERT, or which records to DELETE, or records and after-image of their values for UPDATE). | Required |
Create Mode | This determines what should happen if the table already exists (ERROR/REPLACE/IGNORE) | Required only for CTAS |
Output Mode | For views that modify a DB it is important to control which records to “return”. Common default is NO_OUTPUT where we return nothing. Alternatively, we can return MODIFIED_RECORDS, that can be further manipulated by layering more rels ontop of this WriteRel (e.g., to “count how many records were updated”). This also allows to return the after-image of the change. To return before-image (or both) one can use the reference mechanisms and have multiple return values. | Required for VIEW CREATE/CREATE_OR_REPLACE/ALTER |
Write Definition Types¶
Adding new Write Definition Types
If you have a write definition that’s not covered here, see the process for adding new write definition types.
Write definition types are built by the community and added to the specification.
message WriteRel {
// Definition of which TABLE we are operating on
oneof write_type {
NamedObjectWrite named_table = 1;
ExtensionObject extension_table = 2;
}
// The schema of the table (must align with Rel input (e.g., number of leaf fields must match))
NamedStruct table_schema = 3;
// The type of operation to perform
WriteOp op = 4;
// The relation that determines the records to add/remove/modify
// the schema must match with table_schema. Default values must be explicitly stated
// in a ProjectRel at the top of the input. The match must also
// occur in case of DELETE to ensure multi-engine plans are unequivocal.
Rel input = 5;
CreateMode create_mode = 8; // Used with CTAS to determine what to do if the table already exists
// Output mode determines what is the output of executing this rel
OutputMode output = 6;
RelCommon common = 7;
enum WriteOp {
WRITE_OP_UNSPECIFIED = 0;
// The insert of new records in a table
WRITE_OP_INSERT = 1;
// The removal of records from a table
WRITE_OP_DELETE = 2;
// The modification of existing records within a table
WRITE_OP_UPDATE = 3;
// The Creation of a new table, and the insert of new records in the table
WRITE_OP_CTAS = 4;
}
enum CreateMode {
CREATE_MODE_UNSPECIFIED = 0;
CREATE_MODE_APPEND_IF_EXISTS = 1; // Append the data to the table if it already exists
CREATE_MODE_REPLACE_IF_EXISTS = 2; // Replace the table if it already exists ("OR REPLACE")
CREATE_MODE_IGNORE_IF_EXISTS = 3; // Ignore the request if the table already exists ("IF NOT EXISTS")
CREATE_MODE_ERROR_IF_EXISTS = 4; // Throw an error if the table already exists (default behavior)
}
enum OutputMode {
OUTPUT_MODE_UNSPECIFIED = 0;
// return no records at all
OUTPUT_MODE_NO_OUTPUT = 1;
// this mode makes the operator return all the record INSERTED/DELETED/UPDATED by the operator.
// The operator returns the AFTER-image of any change. This can be further manipulated by operators upstreams
// (e.g., retunring the typical "count of modified records").
// For scenarios in which the BEFORE image is required, the user must implement a spool (via references to
// subplans in the body of the Rel input) and return those with anounter PlanRel.relations.
OUTPUT_MODE_MODIFIED_RECORDS = 2;
}
}
Virtual Table¶
Property | Description | Required |
---|---|---|
Name | The in-memory name to give the dataset. | Required |
Pin | Whether it is okay to remove this dataset from memory or it should be kept in memory. | Optional, defaults to false. |
Files Type¶
Property | Description | Required |
---|---|---|
Path | A URI to write the data to. Supports the inclusion of field references that are listed as available in properties as a “rotation description field”. | Required |
Format | Enumeration of available formats. Only current option is PARQUET. | Required |
Update Operator¶
The update operator applies a set of column transformations on a named table and writes to a storage.
Signature | Value |
---|---|
Inputs | 0 |
Outputs | 1 |
Property Maintenance | Output is number of modified records |
Update Properties¶
Property | Description | Required |
---|---|---|
Update Type | Definition of which object we are operating on (e.g., a fully-qualified table name). | Required |
Table Schema | The names and types of all the columns of the input table | Required |
Update Condition | The condition that must be met for a record to be updated. | Required |
Update Transformations | The set of column updates to be applied to the table. | Required |
message UpdateRel {
oneof update_type {
NamedTable named_table = 1;
}
NamedStruct table_schema = 2; // The full schema of the named_table
Expression condition = 3; // condition to be met for the update to be applied on a record
// The list of transformations to apply to the columns of the named_table
repeated TransformExpression transformations = 4;
message TransformExpression {
Expression transformation = 1; // the transformation to apply
int32 column_target = 2; // index of the column to apply the transformation to
}
}
DDL (Data Definition Language) Operator¶
The operator that defines modifications of a database schema (CREATE/DROP/ALTER for TABLE and VIEWS).
Signature | Value |
---|---|
Inputs | 1 |
Outputs | 0 |
Property Maintenance | N/A (no output) |
Direct Output Order | N/A |
DDL Properties¶
Property | Description | Required |
---|---|---|
Write Type | Definition of which type of object we are operating on. | Required |
Table Schema | The names of all the columns and their type. | Required (except for DROP operations) |
Table Defaults | The set of default values for this table. | Required (except for DROP operations) |
DDL Object | Which type of object we are operating on (e.g., TABLE or VIEW). | Required |
DDL Operator | The operation to be performed (e.g., CREATE/ALTER/DROP). | Required |
View Definition | A Rel representing the “body” of a VIEW. | Required for VIEW CREATE/CREATE_OR_REPLACE/ALTER |
message DdlRel {
// Definition of which type of object we are operating on
oneof write_type {
NamedObjectWrite named_object = 1;
ExtensionObject extension_object = 2;
}
// The columns that will be modified (representing after-image of a schema change)
NamedStruct table_schema = 3;
// The default values for the columns (representing after-image of a schema change)
// E.g., in case of an ALTER TABLE that changes some of the column default values, we expect
// the table_defaults Struct to report a full list of default values reflecting the result of applying
// the ALTER TABLE operator successfully
Expression.Literal.Struct table_defaults = 4;
// Which type of object we operate on
DdlObject object = 5;
// The type of operation to perform
DdlOp op = 6;
// The body of the CREATE VIEW
Rel view_definition = 7;
RelCommon common = 8;
enum DdlObject {
DDL_OBJECT_UNSPECIFIED = 0;
// A Table object in the system
DDL_OBJECT_TABLE = 1;
// A View object in the system
DDL_OBJECT_VIEW = 2;
}
enum DdlOp {
DDL_OP_UNSPECIFIED = 0;
// A create operation (for any object)
DDL_OP_CREATE = 1;
// A create operation if the object does not exist, or replaces it (equivalent to a DROP + CREATE) if the object already exists
DDL_OP_CREATE_OR_REPLACE = 2;
// An operation that modifies the schema (e.g., column names, types, default values) for the target object
DDL_OP_ALTER = 3;
// An operation that removes an object from the system
DDL_OP_DROP = 4;
// An operation that removes an object from the system (without throwing an exception if the object did not exist)
DDL_OP_DROP_IF_EXIST = 5;
}
//TODO add PK/constraints/indexes/etc..?
}
Discussion Points
- How should correlated operations be handled?