Skip to content

Logical Relations

Read Operator

The read operator is an operator that produces one output. A simple example would be the reading of a Parquet file. It is expected that many types of reads will be added over time.

Signature Value
Inputs 0
Outputs 1
Property Maintenance N/A (no inputs)
Direct Output Order Defaults to the schema of the data read after the optional projection (masked complex expression) is applied.

Read Properties

Property Description Required
Definition The contents of the read property definition. Required
Direct Schema Defines the schema of the output of the read (before any projection or emit remapping/hiding). Required
Filter A boolean Substrait expression that describes a filter that must be applied to the data. The filter should be interpreted against the direct schema. Optional, defaults to none.
Best Effort Filter A boolean Substrait expression that describes a filter that may be applied to the data. The filter should be interpreted against the direct schema. Optional, defaults to none.
Projection A masked complex expression describing the portions of the content that should be read Optional, defaults to all of schema
Output Properties Declaration of orderedness and/or distribution properties this read produces. Optional, defaults to no properties.
Properties A list of name/value pairs associated with the read. Optional, defaults to empty

Read Filtering

The read relation has two different filter properties. A filter, which must be satisfied by the operator and a best effort filter, which does not have to be satisfied. This reflects the way that consumers are often implemented. A consumer is often only able to fully apply a limited set of operations in the scan. There can then be an extended set of operations which a consumer can apply in a best effort fashion. A producer, when setting these two fields, should take care to only use expressions that the consumer is capable of handling.

As an example, a consumer may only be able to fully apply (in the read relation) <, =, and > on integral types. The consumer may be able to apply <, =, and > in a best effort fashion on decimal and string types. Consider the filter expression my_int < 10 && my_string < "x" && upper(my_string) > "B". In this case the filter should be set to my_int < 10 and the best_effort_filter should be set to my_string < "x" and the remaining portion (upper(my_string) > "B") should be put into a filter relation.

A filter expression must be interpreted against the direct schema before the projection expression has been applied. As a result, fields may be referenced by the filter expression which are not included in the relation’s output.

Read Definition Types

Adding new Read Definition Types

If you have a read definition that’s not covered here, see the process for adding new read definition types.

Read definition types (like the rest of the features in Substrait) are built by the community and added to the specification.

Virtual Table

A virtual table is a table whose contents are embedded in the plan itself. The table data is encoded as records consisting of literal values or expressions that can be resolved without referencing any input data. For example, a literal, a function call involving literals, or any other expression that does not require input.

Property Description Required
Data Required Required

Named Table

A named table is a reference to data defined elsewhere. For example, there may be a catalog of tables with unique names that both the producer and consumer agree on. This catalog would provide the consumer with more information on how to retrieve the data.

Property Description Required
Names A list of namespaced strings that, together, form the table name Required (at least one)

Files Type

Property Description Required
Items An array of Items (path or path glob) associated with the read. Required
Format per item Enumeration of available formats. Only current option is PARQUET. Required
Slicing parameters per item Information to use when reading a slice of a file. Optional
Slicing Files

A read operation is allowed to only read part of a file. This is convenient, for example, when distributing a read operation across several nodes. The slicing parameters are specified as byte offsets into the file.

Many file formats consist of indivisible “chunks” of data (e.g. Parquet row groups). If this happens the consumer can determine which slice a particular chunk belongs to. For example, one possible approach is that a chunk should only be read if the midpoint of the chunk (dividing by 2 and rounding down) is contained within the asked-for byte range.

message ReadRel {
  RelCommon common = 1;
  NamedStruct base_schema = 2;
  Expression filter = 3;
  Expression best_effort_filter = 11;
  Expression.MaskExpression projection = 4;
  substrait.extensions.AdvancedExtension advanced_extension = 10;

  // Definition of which type of scan operation is to be performed
  oneof read_type {
    VirtualTable virtual_table = 5;
    LocalFiles local_files = 6;
    NamedTable named_table = 7;
    ExtensionTable extension_table = 8;
    IcebergTable iceberg_table = 9;
  }

  // A base table. The list of string is used to represent namespacing (e.g., mydb.mytable).
  // This assumes shared catalog between systems exchanging a message.
  message NamedTable {
    repeated string names = 1;
    substrait.extensions.AdvancedExtension advanced_extension = 10;
  }

  // Read an Iceberg Table
  message IcebergTable {
    oneof table_type {
      MetadataFileRead direct = 1;
      // future: add catalog table types (e.g. rest api, latest metadata in path, etc)
    }

    // Read an Iceberg table using a metadata file. Implicit assumption: required credentials are already known by plan consumer.
    message MetadataFileRead {
      // the specific uri of a metadata file (e.g. s3://mybucket/mytable/<ver>-<uuid>.metadata.json)
      string metadata_uri = 1;

      // snapshot options. if none set, uses the current snapshot listed in the metadata file
      oneof snapshot {
        // the snapshot id to read.
        string snapshot_id = 2;

        // the timestamp that should be used to select the snapshot (Time passed in microseconds since 1970-01-01 00:00:00.000000 in UTC)
        int64 snapshot_timestamp = 3;
      }
    }
  }

  // A table composed of expressions.
  message VirtualTable {
    repeated Expression.Literal.Struct values = 1 [deprecated = true];
    repeated Expression.Nested.Struct expressions = 2;
  }

  // A stub type that can be used to extend/introduce new table types outside
  // the specification.
  message ExtensionTable {
    google.protobuf.Any detail = 1;
  }

  // Represents a list of files in input of a scan operation
  message LocalFiles {
    repeated FileOrFiles items = 1;
    substrait.extensions.AdvancedExtension advanced_extension = 10;

    // Many files consist of indivisible chunks (e.g. parquet row groups
    // or CSV rows).  If a slice partially selects an indivisible chunk
    // then the consumer should employ some rule to decide which slice to
    // include the chunk in (e.g. include it in the slice that contains
    // the midpoint of the chunk)
    message FileOrFiles {
      oneof path_type {
        // A URI that can refer to either a single folder or a single file
        string uri_path = 1;
        // A URI where the path portion is a glob expression that can
        // identify zero or more paths.
        // Consumers should support the POSIX syntax.  The recursive
        // globstar (**) may not be supported.
        string uri_path_glob = 2;
        // A URI that refers to a single file
        string uri_file = 3;
        // A URI that refers to a single folder
        string uri_folder = 4;
      }

      // Original file format enum, superseded by the file_format oneof.
      reserved 5;
      reserved "format";

      // The index of the partition this item belongs to
      uint64 partition_index = 6;

      // The start position in byte to read from this item
      uint64 start = 7;

      // The length in byte to read from this item
      uint64 length = 8;

      message ParquetReadOptions {}
      message ArrowReadOptions {}
      message OrcReadOptions {}
      message DwrfReadOptions {}
      message DelimiterSeparatedTextReadOptions {
        // Delimiter separated files may be compressed.  The reader should
        // autodetect this and decompress as needed.

        // The character(s) used to separate fields.  Common values are comma,
        // tab, and pipe.  Multiple characters are allowed.
        string field_delimiter = 1;
        // The maximum number of bytes to read from a single line.  If a line
        // exceeds this limit the resulting behavior is undefined.
        uint64 max_line_size = 2;
        // The character(s) used to quote strings.  Common values are single
        // and double quotation marks.
        string quote = 3;
        // The number of lines to skip at the beginning of the file.
        uint64 header_lines_to_skip = 4;
        // The character used to escape characters in strings.  Backslash is
        // a common value.  Note that a double quote mark can also be used as an
        // escape character but the external quotes should be removed first.
        string escape = 5;
        // If this value is encountered (including empty string), the resulting
        // value is null instead.  Leave unset to disable.  If this value is
        // provided, the effective schema of this file is comprised entirely of
        // nullable strings.  If not provided, the effective schema is instead
        // made up of non-nullable strings.
        optional string value_treated_as_null = 6;
      }

      // The format of the files along with options for reading those files.
      oneof file_format {
        ParquetReadOptions parquet = 9;
        ArrowReadOptions arrow = 10;
        OrcReadOptions orc = 11;
        google.protobuf.Any extension = 12;
        DwrfReadOptions dwrf = 13;
        DelimiterSeparatedTextReadOptions text = 14;
      }
    }
  }

}

Iceberg Table Type

A Iceberg Table is a table built on Apache Iceberg. Iceberg tables can be read by either directly reading a metadata file or by consulting a catalog.

Metadata File Reading

Points to an Iceberg metadata file and uses that as a starting point for reading an Iceberg table. This is the simplest form of Iceberg table access but should be limited to use for reads. (Writes often also need to update an external catalog.)

Property Description Required
metadata_uri A URI for an Iceberg metadata file. This current snapshot will be read from this file. Required
snapshot_id The snapshot that should be read using id. If not provided, the current snapshot is read. Only one of snapshot_id or snapshot_timestamp should be set. Optional
snapshot_timestamp The snapshot that should be read using timestamp. If not provided, the current snapshot is read. Optional

Filter Operation

The filter operator eliminates one or more records from the input data based on a boolean filter expression.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Orderedness, Distribution, remapped by emit
Direct Output Order The field order as the input.

Filter Properties

Property Description Required
Input The relational input. Required
Expression A boolean expression which describes which records are included/excluded. Required
message FilterRel {
  RelCommon common = 1;
  Rel input = 2;
  Expression condition = 3;
  substrait.extensions.AdvancedExtension advanced_extension = 10;

}

Sort Operation

The sort operator reorders a dataset based on one or more identified sort fields and a sorting function for each.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Will update orderedness property to the output of the sort operation. Distribution property only remapped based on emit.
Direct Output Order The field order of the input.

Sort Properties

Property Description Required
Input The relational input. Required
Sort Fields List of one or more fields to sort by. Uses the same properties as the orderedness property. One sort field required
message SortRel {
  RelCommon common = 1;
  Rel input = 2;
  repeated SortField sorts = 3;
  substrait.extensions.AdvancedExtension advanced_extension = 10;

}

Project Operation

The project operation will produce one or more additional expressions based on the inputs of the relation.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Distribution maintained, mapped by emit. Orderedness: Maintained if no window operations. Extended to include projection fields if fields are direct references. If window operations are present, no orderedness is maintained.
Direct Output Order The field order of the input + the list of new expressions in the order they are declared in the expressions list.

Project Properties

Property Description Required
Input The relational input. Required
Expressions List of one or more expressions to add to the input. At least one expression required
message ProjectRel {
  RelCommon common = 1;
  Rel input = 2;
  repeated Expression expressions = 3;
  substrait.extensions.AdvancedExtension advanced_extension = 10;

}

Cross Product Operation

The cross product operation will combine two separate inputs into a single output. It pairs every record from the left input with every record of the right input.

Signature Value
Inputs 2
Outputs 1
Property Maintenance Distribution is maintained. Orderedness is empty post operation.
Direct Output Order The emit order of the left input followed by the emit order of the right input.

Cross Product Properties

Property Description Required
Left Input A relational input. Required
Right Input A relational input. Required
message CrossRel {
  RelCommon common = 1;
  Rel left = 2;
  Rel right = 3;

  substrait.extensions.AdvancedExtension advanced_extension = 10;

}

Join Operation

The join operation will combine two separate inputs into a single output, based on a join expression. A common subtype of joins is an equality join where the join expression is constrained to a list of equality (or equality + null equality) conditions between the two inputs of the join.

Signature Value
Inputs 2
Outputs 1
Property Maintenance Distribution is maintained. Orderedness is empty post operation. Physical relations may provide better property maintenance.
Direct Output Order The emit order of the left input followed by the emit order of the right input.

Join Properties

Property Description Required
Left Input A relational input. Required
Right Input A relational input. Required
Join Expression A boolean condition that describes whether each record from the left set “match” the record from the right set. Field references correspond to the direct output order of the data. Required. Can be the literal True.
Post-Join Filter A boolean condition to be applied to each result record after the inputs have been joined, yielding only the records that satisfied the condition. Optional
Join Type One of the join types defined below. Required

Join Types

Type Description
Inner Return records from the left side only if they match the right side. Return records from the right side only when they match the left side. For each cross input match, return a record including the data from both sides. Non-matching records are ignored.
Outer Return all records from both the left and right inputs. For each cross input match, return a record including the data from both sides. For any remaining non-match records, return the record from the corresponding input along with nulls for the opposite input.
Left Return all records from the left input. For each cross input match, return a record including the data from both sides. For any remaining non-matching records from the left input, return the left record along with nulls for the right input.
Right Return all records from the right input. For each cross input match, return a record including the data from both sides. For any remaining non-matching records from the right input, return the right record along with nulls for the left input.
Left Semi Returns records from the left input. These are returned only if the records have a join partner on the right side.
Right Semi Returns records from the right input. These are returned only if the records have a join partner on the left side.
Left Anti Return records from the left input. These are returned only if the records do not have a join partner on the right side.
Right Anti Return records from the right input. These are returned only if the records do not have a join partner on the left side.
Left Single Return all records from the left input with no join expansion. If at least one record from the right input matches the left, return one arbitrary matching record from the right input. For any left records without matching right records, return the left record along with nulls for the right input. Similar to a left outer join but only returns one right match at most. Useful for nested sub-queries where we need exactly one record in output (or throw exception). See Section 3.2 of https://15721.courses.cs.cmu.edu/spring2018/papers/16-optimizer2/hyperjoins-btw2017.pdf for more information.
Right Single Same as left single except that the right and left inputs are switched.
Left Mark Returns one record for each record from the left input. Appends one additional “mark” column to the output of the join. The new column will be listed after all columns from both sides and will be of type nullable boolean. If there is at least one join partner in the right input where the join condition evaluates to true then the mark column will be set to true. Otherwise, if there is at least one join partner in the right input where the join condition evaluates to NULL then the mark column will be set to NULL. Otherwise the mark column will be set to false.
Right Mark Returns records from the right input. Appends one additional “mark” column to the output of the join. The new column will be listed after all columns from both sides and will be of type nullable boolean. If there is at least one join partner in the left input where the join condition evaluates to true then the mark column will be set to true. Otherwise, if there is at least one join partner in the left input where the join condition evaluates to NULL then the mark column will be set to NULL. Otherwise the mark column will be set to false.
message JoinRel {
  RelCommon common = 1;
  Rel left = 2;
  Rel right = 3;
  Expression expression = 4;
  Expression post_join_filter = 5;

  JoinType type = 6;

  enum JoinType {
    JOIN_TYPE_UNSPECIFIED = 0;
    JOIN_TYPE_INNER = 1;
    JOIN_TYPE_OUTER = 2;
    JOIN_TYPE_LEFT = 3;
    JOIN_TYPE_RIGHT = 4;
    JOIN_TYPE_LEFT_SEMI = 5;
    JOIN_TYPE_LEFT_ANTI = 6;
    JOIN_TYPE_LEFT_SINGLE = 7;
    JOIN_TYPE_RIGHT_SEMI = 8;
    JOIN_TYPE_RIGHT_ANTI = 9;
    JOIN_TYPE_RIGHT_SINGLE = 10;
    JOIN_TYPE_LEFT_MARK = 11;
    JOIN_TYPE_RIGHT_MARK = 12;
  }

  substrait.extensions.AdvancedExtension advanced_extension = 10;

}

Set Operation

The set operation encompasses several set-level operations that support combining datasets, possibly excluding records based on various types of record level matching.

Signature Value
Inputs 2 or more
Outputs 1
Property Maintenance Maintains distribution if all inputs have the same ordinal distribution. Orderedness is not maintained.
Direct Output Order The field order of the inputs. All inputs must have identical field types, but field nullabilities may vary.

Set Properties

Property Description Required
Primary Input The primary input of the relation. Required
Secondary Inputs One or more relational inputs. At least one required
Set Operation Type From list below. Required

Set Operation Types

The set operation type determines both the records that are emitted and the type of the output record.

For some set operations, whether a specific record is included in the output and if it appears more than once depends on the number of times it occurs across all inputs. In the following table, treat: * m: the number of time a records occurs in the primary input (p) * n1: the number of times a record occurs in the 1st secondary input (s1) * n2: the number of times a record occurs in the 2nd secondary input (s2) * … * n: the number of times a record occurs in the nth secondary input

Operation Description Examples Output Nullability
Minus (Primary) Returns all records from the primary input excluding any matching rows from secondary inputs, removing duplicates.
Each value is treated as a unique member of the set, so duplicates in the first set don’t affect the result.
This operation maps to SQL EXCEPT DISTINCT.
MINUS
  p: {1, 2, 2, 3, 3, 3, 4}
  s1: {1, 2}
  s2: {3}
YIELDS
{4}
The same as the primary input.
Minus (Primary All) Returns all records from the primary input excluding any matching records from secondary inputs.
For each specific record returned, the output contains max(0, m - sum(n1, n2, …, n)) copies.
This operation maps to SQL EXCEPT ALL.
MINUS ALL
  p: {1, 2, 2, 3, 3, 3, 3}
  s1: {1, 2, 3, 4}
  s2: {3}
YIELDS
{2, 3, 3}
The same as the primary input.
Minus (Multiset) Returns all records from the primary input excluding any records that are included in all secondary inputs.
This operation does not have a direct SQL mapping.
MINUS MULTISET
  p: {1, 2, 3, 4}
  s1: {1, 2}
  s2: {1, 2, 3}
YIELDS
{3, 4}
The same as the primary input.
Intersection (Primary) Returns all records from the primary input that are present in any secondary input, removing duplicates.
This operation does not have a direct SQL mapping.
INTERSECT
  p: {1, 2, 2, 3, 3, 3, 4}
  s1: {1, 2, 3, 5}
  s2: {2, 3, 6}
YIELDS
{1, 2, 3}
If a field is nullable in the primary input and in any of the secondary inputs, it is nullable in the output.
Intersection (Multiset) Returns all records from the primary input that match at least one record from all secondary inputs.
This operation maps to SQL INTERSECT DISTINCT
INTERSECT MULTISET
  p: {1, 2, 3, 4}
  s1: {2, 3}
  s2: {3, 4}
YIELDS
{3}
If a field is required in any of the inputs, it is required in the output.
Intersection (Multiset All) Returns all records from the primary input that are present in every secondary input.
For each specific record returned, the output contains min(m, n1, n2, …, n) copies.
This operation maps to SQL INTERSECT ALL.
INTERSECT ALL
  p: {1, 2, 2, 3, 3, 3, 4}
  s1: {1, 2, 3, 3, 5}
  s2: {2, 3, 3, 6}
YIELDS
{2, 3, 3}
If a field is required in any of the inputs, it is required in the output.
Union Distinct Returns all records from each set, removing duplicates.
This operation maps to SQL UNION DISTINCT.
UNION
  p: {1, 2, 2, 3, 3, 3, 4}
  s1: {2, 3, 5}
  s2: {1, 6}
YIELDS
{1, 2, 3, 4, 5, 6}
If a field is nullable in any of the inputs, it is nullable in the output.
Union All Returns all records from all inputs.
For each specific record returned, the output contains (m + n1 + n2 + … + n) copies.
This operation maps to SQL UNION ALL.
UNION ALL
  p: {1, 2, 2, 3, 3, 3, 4}
  s1: {2, 3, 5}
  s2: {1, 6}
YIELDS
{1, 2, 2, 3, 3, 3, 4, 2, 3, 5, 1, 6}
If a field is nullable in any of the inputs, it is nullable in the output.

Note that for set operations, NULL matches NULL. That is

{NULL, 1, 3} MINUS          {NULL, 2, 4} === (1), (3)
{NULL, 1, 3} INTERSECTION   {NULL, 2, 3} === (NULL)
{NULL, 1, 3} UNION DISTINCT {NULL, 2, 4} === (NULL), (1), (2), (3), (4)

Output Type Derivation Examples

Given the following inputs, where R is Required and N is Nullable:

Input 1: (R, R, R, R, N, N, N, N)  Primary Input
Input 2: (R, R, N, N, R, R, N, N)  Secondary Input
Input 3: (R, N, R, N, R, N, R, N)  Secondary Input

The output type is as follows for the various operations

Property Output Type
Minus (Primary) (R, R, R, R, N, N, N, N)
Minus (Primary All) (R, R, R, R, N, N, N, N)
Minus (Multiset) (R, R, R, R, N, N, N, N)
Intersection (Primary) (R, R, R, R, R, N, N, N)
Intersection (Multiset) (R, R, R, R, R, R, R, N)
Intersection (Multiset All) (R, R, R, R, R, R, R, N)
Union Distinct (R, N, N, N, N, N, N, N)
Union All (R, N, N, N, N, N, N, N)
message SetRel {
  RelCommon common = 1;
  // The first input is the primary input, the remaining are secondary
  // inputs.  There must be at least two inputs.
  repeated Rel inputs = 2;
  SetOp op = 3;
  substrait.extensions.AdvancedExtension advanced_extension = 10;

  enum SetOp {
    SET_OP_UNSPECIFIED = 0;
    SET_OP_MINUS_PRIMARY = 1;
    SET_OP_MINUS_PRIMARY_ALL = 7;
    SET_OP_MINUS_MULTISET = 2;
    SET_OP_INTERSECTION_PRIMARY = 3;
    SET_OP_INTERSECTION_MULTISET = 4;
    SET_OP_INTERSECTION_MULTISET_ALL = 8;
    SET_OP_UNION_DISTINCT = 5;
    SET_OP_UNION_ALL = 6;
  }

}

Fetch Operation

The fetch operation eliminates records outside a desired window. Typically corresponds to a fetch/offset SQL clause. Will only returns records between the start offset and the end offset.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Maintains distribution and orderedness.
Direct Output Order Unchanged from input.

Fetch Properties

Property Description Required
Input A relational input, typically with a desired orderedness property. Required
Offset Expression An expression which evaluates to a non-negative integer or null (recommended type is i64). Declares the offset for retrieval of records. An expression evaluating to null is treated as 0. Optional, defaults to a 0 literal.
Count Expression An expression which evaluates to a non-negative integer or null (recommended type is i64). Declares the number of records that should be returned. An expression evaluating to null indicates that all records should be returned. Optional, defaults to a null literal.
message FetchRel {
  RelCommon common = 1;
  Rel input = 2;
  // Note: A oneof field is inherently optional, whereas individual fields
  // within a oneof cannot be marked as optional. The unset state of offset
  // should therefore be checked at the oneof level. Unset is treated as 0.
  oneof offset_mode {
    // the offset expressed in number of records
    // Deprecated: use `offset_expr` instead
    int64 offset = 3 [deprecated = true];
    // Expression evaluated into a non-negative integer specifying the number
    // of records to skip. An expression evaluating to null is treated as 0.
    // Evaluating to a negative integer should result in an error.
    // Recommended type for offset is int64.
    Expression offset_expr = 5;
  }
  // Note: A oneof field is inherently optional, whereas individual fields
  // within a oneof cannot be marked as optional. The unset state of count
  // should therefore be checked at the oneof level. Unset is treated as ALL.
  oneof count_mode {
    // the amount of records to return
    // use -1 to signal that ALL records should be returned
    // Deprecated: use `count_expr` instead
    int64 count = 4 [deprecated = true];
    // Expression evaluated into a non-negative integer specifying the number
    // of records to return. An expression evaluating to null signals that ALL
    // records should be returned.
    // Evaluating to a negative integer should result in an error.
    // Recommended type for count is int64.
    Expression count_expr = 6;
  }
  substrait.extensions.AdvancedExtension advanced_extension = 10;

}

Aggregate Operation

The aggregate operation groups input data on one or more sets of grouping keys, calculating each measure for each combination of grouping key.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Maintains distribution if all distribution fields are contained in every grouping set. No orderedness guaranteed.
Direct Output Order The list of grouping expressions in declaration order followed by the list of measures in declaration order, followed by an i32 describing the associated particular grouping set the value is derived from (if applicable).

In its simplest form, an aggregation has only measures. In this case, all records are folded into one, and a column is returned for each aggregate expression in the measures list.

Grouping sets can be used for finer-grained control over which records are folded. A grouping set consists of zero or more references to the list of grouping expressions. Within a grouping set, two records will be folded together if and only if they have the same values for each of the expressions in the grouping set. The values returned by the grouping expressions will be returned as columns to the left of the columns for the aggregate expressions. Each of the grouping expressions must occur in at least one of the grouping sets. If a grouping set contains no grouping expressions, all rows will be folded for that grouping set. (Having a single grouping set with no grouping expressions is thus equivalent to not having any grouping sets.)

It is possible to specify multiple grouping sets in a single aggregate operation. The grouping sets behave more or less independently, with each returned record belonging to one of the grouping sets. The values for the grouping expression columns that are not part of the grouping set for a particular record will be set to null. The columns for grouping expressions that do not appear in all grouping sets will be nullable (regardless of the nullability of the type returned by the grouping expression) to accomodate the null insertion.

To further disambiguate which record belongs to which grouping set, an aggregate relation with more than one grouping set receives an extra i32 column on the right-hand side. The value of this field will be the zero-based index of the grouping set that yielded the record.

If at least one grouping expression is present, the aggregation is allowed to not have any aggregate expressions. An aggregate relation is invalid if it would yield zero columns.

Aggregate Properties

Property Description Required
Input The relational input. Required
Grouping Sets One or more grouping sets. Optional, required if no measures.
Per Grouping Set A list of expression grouping that the aggregation measured should be calculated for. Optional.
Measures A list of one or more aggregate expressions along with an optional filter. Optional, required if no grouping sets.
message AggregateRel {
  RelCommon common = 1;

  // Input of the aggregation
  Rel input = 2;

  // A list of zero or more grouping sets that the aggregation measures should
  // be calculated for. There must be at least one grouping set if there are no
  // measures (but it can be the empty grouping set).
  repeated Grouping groupings = 3;

  // A list of one or more aggregate expressions along with an optional filter.
  // Required if there are no groupings.
  repeated Measure measures = 4;

  // A list of zero or more grouping expressions that grouping sets (i.e.,
  // `Grouping` messages in the `groupings` field) can reference. Each
  // expression in this list must be referred to by at least one
  // `Grouping.expression_references`.
  repeated Expression grouping_expressions = 5;

  substrait.extensions.AdvancedExtension advanced_extension = 10;

  message Grouping {
    // Deprecated in favor of `expression_references` below.
    repeated Expression grouping_expressions = 1 [deprecated = true];

    // A list of zero or more references to grouping expressions, i.e., indices
    // into the `grouping_expression` list.
    repeated uint32 expression_references = 2;
  }

  message Measure {
    AggregateFunction measure = 1;

    // An optional boolean expression that acts to filter which records are
    // included in the measure. True means include this record for calculation
    // within the measure.
    // Helps to support SUM(<c>) FILTER(WHERE...) syntax without masking opportunities for optimization
    Expression filter = 2;
  }

}

Reference Operator

The reference operator is used to construct DAGs of operations. In a Plan we can have multiple Rel representing various computations with potentially multiple outputs. The ReferenceRel is used to express the fact that multiple Rel might be sharing subtrees of computation. This can be used to express arbitrary DAGs as well as represent multi-query optimizations.

As a concrete example think about two queries SELECT * FROM A JOIN B JOIN C and SELECT * FROM A JOIN B JOIN D, We could use the ReferenceRel to highlight the shared A JOIN B between the two queries, by creating a plan with 3 Rel. One expressing A JOIN B (in position 0 in the plan), one using reference as follows: ReferenceRel(0) JOIN C and a third one doing ReferenceRel(0) JOIN D. This allows to avoid the redundancy of A JOIN B.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Maintains all properties of the input
Direct Output Order Maintains order

Reference Properties

Property Description Required
Referred Rel A zero-indexed positional reference to a Rel defined within the same Plan. Required
message ReferenceRel {
  int32 subtree_ordinal = 1;

}

Write Operator

The write operator is an operator that consumes one input and writes it to storage. This can range from writing to a Parquet file, to INSERT/DELETE/UPDATE in a database.

Signature Value
Inputs 1
Outputs 1
Property Maintenance Output depends on OutputMode (none, or modified records)
Direct Output Order Unchanged from input

Write Properties

Property Description Required
Write Type Definition of which object we are operating on (e.g., a fully-qualified table name). Required
CTAS Schema The names of all the columns and their type for a CREATE TABLE AS. Required only for CTAS
Write Operator Which type of operation we are performing (INSERT/DELETE/UPDATE/CTAS). Required
Rel Input The Rel representing which records we will be operating on (e.g., VALUES for an INSERT, or which records to DELETE, or records and after-image of their values for UPDATE). Required
Create Mode This determines what should happen if the table already exists (ERROR/REPLACE/IGNORE) Required only for CTAS
Output Mode For views that modify a DB it is important to control which records to “return”. Common default is NO_OUTPUT where we return nothing. Alternatively, we can return MODIFIED_RECORDS, that can be further manipulated by layering more rels ontop of this WriteRel (e.g., to “count how many records were updated”). This also allows to return the after-image of the change. To return before-image (or both) one can use the reference mechanisms and have multiple return values. Required for VIEW CREATE/CREATE_OR_REPLACE/ALTER

Write Definition Types

Adding new Write Definition Types

If you have a write definition that’s not covered here, see the process for adding new write definition types.

Write definition types are built by the community and added to the specification.

message WriteRel {
  // Definition of which TABLE we are operating on
  oneof write_type {
    NamedObjectWrite named_table = 1;
    ExtensionObject extension_table = 2;
  }

  // The schema of the table (must align with Rel input (e.g., number of leaf fields must match))
  NamedStruct table_schema = 3;

  // The type of operation to perform
  WriteOp op = 4;

  // The relation that determines the records to add/remove/modify
  // the schema must match with table_schema. Default values must be explicitly stated
  // in a ProjectRel at the top of the input. The match must also
  // occur in case of DELETE to ensure multi-engine plans are unequivocal.
  Rel input = 5;

  CreateMode create_mode = 8; // Used with CTAS to determine what to do if the table already exists

  // Output mode determines what is the output of executing this rel
  OutputMode output = 6;
  RelCommon common = 7;

  enum WriteOp {
    WRITE_OP_UNSPECIFIED = 0;
    // The insert of new records in a table
    WRITE_OP_INSERT = 1;
    // The removal of records from a table
    WRITE_OP_DELETE = 2;
    // The modification of existing records within a table
    WRITE_OP_UPDATE = 3;
    // The Creation of a new table, and the insert of new records in the table
    WRITE_OP_CTAS = 4;
  }

  enum CreateMode {
    CREATE_MODE_UNSPECIFIED = 0;
    CREATE_MODE_APPEND_IF_EXISTS = 1; // Append the data to the table if it already exists
    CREATE_MODE_REPLACE_IF_EXISTS = 2; // Replace the table if it already exists ("OR REPLACE")
    CREATE_MODE_IGNORE_IF_EXISTS = 3; // Ignore the request if the table already exists ("IF NOT EXISTS")
    CREATE_MODE_ERROR_IF_EXISTS = 4; // Throw an error if the table already exists (default behavior)
  }

  enum OutputMode {
    OUTPUT_MODE_UNSPECIFIED = 0;
    // return no records at all
    OUTPUT_MODE_NO_OUTPUT = 1;
    // this mode makes the operator return all the record INSERTED/DELETED/UPDATED by the operator.
    // The operator returns the AFTER-image of any change. This can be further manipulated by operators upstreams
    // (e.g., retunring the typical "count of modified records").
    // For scenarios in which the BEFORE image is required, the user must implement a spool (via references to
    // subplans in the body of the Rel input) and return those with anounter PlanRel.relations.
    OUTPUT_MODE_MODIFIED_RECORDS = 2;
  }

}

Virtual Table

Property Description Required
Name The in-memory name to give the dataset. Required
Pin Whether it is okay to remove this dataset from memory or it should be kept in memory. Optional, defaults to false.

Files Type

Property Description Required
Path A URI to write the data to. Supports the inclusion of field references that are listed as available in properties as a “rotation description field”. Required
Format Enumeration of available formats. Only current option is PARQUET. Required

Update Operator

The update operator applies a set of column transformations on a named table and writes to a storage.

Signature Value
Inputs 0
Outputs 1
Property Maintenance Output is number of modified records

Update Properties

Property Description Required
Update Type Definition of which object we are operating on (e.g., a fully-qualified table name). Required
Table Schema The names and types of all the columns of the input table Required
Update Condition The condition that must be met for a record to be updated. Required
Update Transformations The set of column updates to be applied to the table. Required
message UpdateRel {
  oneof update_type {
    NamedTable named_table = 1;
  }

  NamedStruct table_schema = 2; // The full schema of the named_table
  Expression condition = 3; // condition to be met for the update to be applied on a record

  // The list of transformations to apply to the columns of the named_table
  repeated TransformExpression transformations = 4;

  message TransformExpression {
    Expression transformation = 1; // the transformation to apply
    int32 column_target = 2; // index of the column to apply the transformation to
  }

}

DDL (Data Definition Language) Operator

The operator that defines modifications of a database schema (CREATE/DROP/ALTER for TABLE and VIEWS).

Signature Value
Inputs 1
Outputs 0
Property Maintenance N/A (no output)
Direct Output Order N/A

DDL Properties

Property Description Required
Write Type Definition of which type of object we are operating on. Required
Table Schema The names of all the columns and their type. Required (except for DROP operations)
Table Defaults The set of default values for this table. Required (except for DROP operations)
DDL Object Which type of object we are operating on (e.g., TABLE or VIEW). Required
DDL Operator The operation to be performed (e.g., CREATE/ALTER/DROP). Required
View Definition A Rel representing the “body” of a VIEW. Required for VIEW CREATE/CREATE_OR_REPLACE/ALTER
message DdlRel {
  // Definition of which type of object we are operating on
  oneof write_type {
    NamedObjectWrite named_object = 1;
    ExtensionObject extension_object = 2;
  }

  // The columns that will be modified (representing after-image of a schema change)
  NamedStruct table_schema = 3;
  // The default values for the columns (representing after-image of a schema change)
  // E.g., in case of an ALTER TABLE that changes some of the column default values, we expect
  // the table_defaults Struct to report a full list of default values reflecting the result of applying
  // the ALTER TABLE operator successfully
  Expression.Literal.Struct table_defaults = 4;

  // Which type of object we operate on
  DdlObject object = 5;

  // The type of operation to perform
  DdlOp op = 6;

  // The body of the CREATE VIEW
  Rel view_definition = 7;
  RelCommon common = 8;

  enum DdlObject {
    DDL_OBJECT_UNSPECIFIED = 0;
    // A Table object in the system
    DDL_OBJECT_TABLE = 1;
    // A View object in the system
    DDL_OBJECT_VIEW = 2;
  }

  enum DdlOp {
    DDL_OP_UNSPECIFIED = 0;
    // A create operation (for any object)
    DDL_OP_CREATE = 1;
    // A create operation if the object does not exist, or replaces it (equivalent to a DROP + CREATE) if the object already exists
    DDL_OP_CREATE_OR_REPLACE = 2;
    // An operation that modifies the schema (e.g., column names, types, default values) for the target object
    DDL_OP_ALTER = 3;
    // An operation that removes an object from the system
    DDL_OP_DROP = 4;
    // An operation that removes an object from the system (without throwing an exception if the object did not exist)
    DDL_OP_DROP_IF_EXIST = 5;
  }
  //TODO add PK/constraints/indexes/etc..?

}
Discussion Points
  • How should correlated operations be handled?