Scans of global tables will produce `Distribution::Global`,
conflict resolution is described below in the document.
Scans of global tables will produce `Distribution::Global`;
conflict resolution is described further in the text.
# Distributions and their invariants
Distribution is a set of properties that must hold for output table
of operator after the subtree with this operator was dispatched to
storages and materialized.
A Distribution is a set of properties that must hold for the output table
of an operator after the subtree with this operator was dispatched to
storages and then materialized.
If Relational operator has distribution:
Distribution variants of relational operators:
1.**Any** - output table of relational operator is located on several
storages (may be zero or one). The exact location is determined during bucket discovery.
1.**Any** - the output table of the relational operator is located on several
storages (may be zero or one). The exact location is determined during the bucket discovery.
```sql
selectbfromsegment_awherea=1
```
Here `segment_a`- is sharded by `a`, `a = 1` guarantees that
Here `segment_a` is sharded by `a`, `a = 1` guarantees that
we execute the subtree on a single node.
2.**Segment(key)** - the same as **Any**, but the data is spread as if output
table is sharded by **key**.
2.**Segment(key)** - the same as **Any**, but the data is spread as if the output
table was sharded by **key**.
3.**Single** - the subtree with root in this operator will be executed
on a Single node (and it must be executed on a single node). Therefore
the output table will be located on a single node.
4.**Global** - if the subtree with root in this operator is executed on
several nodes, on each node we will have the exact same output table. But
during planning stage we don't know on which nodes the subtree will be executed,
3.**Single** - the subtree with the root in such operator will be
essentially executed on a Single node. Therefore the output table will
be located on a single node as well.
4.**Global** - if the subtree with the root in such operator is executed on
several nodes, we will have the exact same output table on each node. But,
during the planning stage we don't know on which nodes the subtree will be executed,
it will be known only after `bucket_discovery`.
Example: `Values` if you execute wherever we execute this operator, it will
Example: wherever we execute the `Values` operator, it will
return the same table (because this table is serialized inside `Plan`).
# New buckets variant Any
Let's introduce new buckets variant: `Buckets::Any`. If subtree's root has
Let's introduce new buckets variant: `Buckets::Any`. If the subtree's root has
`Buckets::Any`, then the subtree must be executed on some arbitrary single node.
# Noteable changes
*Default bucket variant for expression not referencing any table -`Buckets::Any`.
* If expression references segment table, then it is `Buckets::All`
*The default bucket variant for an expression not referencing any table is`Buckets::Any`.
* If an expression references a segment table, then it is `Buckets::All`
*`Motion(full)` will have `Distribution::Global`, as well as `Values`
* From now on `Motion(full)` will give `Buckets::Any` during `bucket_discovery`,
because it now has `Distribution::Global` or `Distribution::Single`.
* We will drop `Buckets::Single` in favor of `Buckets::Any`.
* Scans of global tables give `Buckets::Any`.
*`Buckets::Any` - the weakest buckets variant, he gives up to all
other buckets variant in disjunction/conjuction.
*`Buckets::Any` - the weakest buckets variant, it gives up to all
other buckets variants in disjunction/conjuction.
Example:
```
...
...
@@ -288,19 +288,19 @@ Proj * -> Any, Buckets::All
Do inner join and then add remaining rows from the global table on the router.
To avoid using extra space, we will sort the result table on left table columns
and then use binsearch to find tuple.
and then use binsearch to find the tuple.
In our case map stage is doing inner join on each node. However the difference
is that reduce stage is done via router (not local sql). We do this router
In our case, the map stage is doing inner join on each node. However, the difference
is that the reduce stage is done via the router (not local sql). We do this router
reduce stage via motion opcodes. We will have two additional opcodes: sort - to
sort table from map stage, left_join_append(global table) - to append missing
rows from global table.
sort table from map stage, left_join_append(global table) - to append the missing
rows from the global table.
Implementation details: during conflict resolution if we see left join with
children that have global and segment distribution respectively, we do a
transformation that splits plan into 2 stages: inner join on all nodes (map)
and adding missing rows from left table (reduce). In map stage, in projection
Implementation details: during conflict resolution, if we see left join with
children that have global and segment distributions respectively, we do a
transformation that splits the plan into 2 stages: inner join on all nodes (map),
and adding missing rows from left table (reduce). As for the projection's map stage,
only the columns needed for above operators must be used, so that we transfer
less data through the wire.
...
...
@@ -361,7 +361,7 @@ projection * -> Single, Buckets::Any
#### 7.1 Unfinished left join with group by
Suppose _global_ has columns a, b. _Segment_ has columns a, c. In projection below motion we should try to minimize the number of columns to send through the wire. We always need all columns from left table, but from right table we only need columns that are used higher in the plan.
Suppose _global_ has columns a, b. _Segment_ has columns a, c. In the projection within the motion we should try to minimize the number of columns to send through the wire. We always need all columns from the left table, but from the right table we only need columns that are used in the upstream plan.
```sql
selectg.a,s.afromglobalasg
...
...
@@ -526,7 +526,7 @@ The same for `single vs global`.
We can't do union all on all nodes, because we will get duplicate results.
Solution: add where filter on primary key for global table scan. Before sending query to the storages parametrize the condition by primary key values for different storages.
Solution: add the `where` filter on the primary key for global table scan. Prior to sending the query to the storages, parametrize the condition by the primary key values for different storages.
Implementation details: currently our executor does not parametrize queries in any way, so I think we will need to save some context information on planning stage: what plan nodes must be parametrized and how.
...
...
@@ -547,9 +547,9 @@ union all
select*fromsegment
```
Suppose we know, that the query must be executed on two replicasets, then:
Suppose we know that the query must be executed on two replicasets, then:
we will scan `global` on the router, and find the primary key value that splits table by half.
we will scan `global` on the router, and find the primary key value that splits table in half.
Suppose it is a1, the first value of the key is a0, and the last value is a2.
replicaset 1:
...
...
@@ -628,8 +628,8 @@ flowchart LR
### global vs segment
Calculate the bucket id for global table rows and create the temporary table
that will hold only those rows that belong to current storage. Use that table
instead of original global table in the query.
that will hold only the rows that belong to current storage. Use that table
instead of the original global table in the query.