Join is a common operation in stream processing. This article introduced RisingWave join use cases, basic stream join principles and join state characteristics.
Background
Join is a common operation in stream processing that combines two or more data streams based on certain conditions. This is particularly useful for scenarios that require analyzing and understanding real-time data from multiple sources. Here are some common use cases of stream joins:
Real-time ad performance monitoring: In a real-time advertising system, ad impressions and user clicks are recorded and collected in real-time. We can use stream processing to handle this data in real time, and use joins to merge ad impression data and user click data by ad ID and user ID. This allows us to calculate real-time click-through rates for each ad to monitor ad performance.
Server cluster performance anomaly detection: Cluster monitoring systems collect network interface metrics and TCP layer performance metrics, and use joins to show all TCP metrics for network interfaces with abnormal traffic.
Real-time recommendation systems: E-commerce platforms may need to join user behaviour data (such as clicks and purchases) with product information to provide personalized product recommendations to users in real-time.
Financial transaction risk control: Banks or financial institutions may need to join transaction data with user risk rating data to assess transaction risks in real-time.
ETL scenarios: Stream processing systems process data from different upstream sources (e.g. Kafka, databases) and use joins to flatten different data sources into a wide data table before writing to downstream analytics systems.
Below, we will introduce the various technical explorations and optimizations that RisingWave, as a distributed streaming database, has undertaken specifically regarding joins.
Characteristics of Stream Processing Joins
RisingWave query processing can be divided into two categories - streaming queries and batch queries. Batch queries process bounded datasets so joins are handled the same as in traditional databases.
Streaming queries are mainly expressed as CREATE MATERIALIZED VIEW
. Joins in streaming queries need to handle unbounded data streams. Any data changes in the two upstream inputs will incrementally compute join result changes and output them downstream. Join is a stateful operator, and how to manage the join state is an important topic in streaming databases.
Symmetric Hash Join
The symmetric hash join algorithm[1], which requires an equijoin condition, is widely utilized in stream processing. The idea is simple - maintain a hash table for each input of the join (where the hash key is the join key).
Whenever there is input from one side, it first inserts into its own hash table, then probes the other side's hash table to compute joint outputs. We can see the two inputs have a symmetric relationship here. The hash tables are also called join states.
Unbounded State
Since join inputs are unbounded, it can be inferred that the join state is also unbounded. This clearly causes storage problems. RisingWave's decoupled compute and storage architecture stores join the state in object storage. Compared to traditional local storage, object storage has many advantages like high scalability, reliability, and cost-effectiveness. Theoretically, the storage capacity limit of RisingWave is the same as that of object storage. To mitigate the higher access latency of object storage, RisingWave utilizes memory and local disks to cache the files of object storage and manages the caches with the Least Recently Used (LRU) strategy. We have a dedicated article introducing storage optimizations for reading and writing streaming states. Click here to read the article.
Watermark & Window
Even though join inputs may be unbounded, we often don't want unbounded join state sizes. Watermark and windowed joins can constrain the join state to a finite size. Stream data is generally unordered, but watermarks allow us to deduce a partial order. For example, we can set a -20s watermark on a source stream's timestamp column, meaning if we see a row with timestamp T, we know all rows with timestamp <= T-20s have arrived. With this guarantee, we can pass a Watermark message on the stream, indicating that the timestamp of all data after the Watermark message is greater than T-20s. This allows us to express the partial order property on streams.
*-- Define a source with watermark
CREATE SOURCE s1 (
id int,
value int,
ts TIMESTAMP,
WATERMARK FOR ts AS ts - INTERVAL '20' SECOND
) WITH (connector = 'datagen');*
With the partial order property, we can combine windows to constrain join state size. Window functions like TUMBLE
and HOP
divide stream data into time windows. If the windowed field has watermark info, the optimizer can further deduce watermarks for the windowed timestamp column. As the name suggests, window joins perform joins within windows, requiring equal window start times between inputs. Thanks to watermarks, windows keep advancing and the state for joined windows can be cleaned up. This allows effective control of joint state size.
CREATE SOURCE s2 (
id int,
value int,
ts TIMESTAMP,
WATERMARK FOR ts AS ts - INTERVAL '20' SECOND
) WITH (connector = 'datagen');
-- Create a materialized view using window join
CREATE MATERIALIZED VIEW window_join AS
SELECT s1.id AS id1,
s1.value AS value1,
s2.id AS id2,
s2.value AS value2
FROM TUMBLE(s1, ts, interval '1' MINUTE)
JOIN TUMBLE(s2, ts, interval '1' MINUTE)
ON s1.id = s2.id and s1.window_start = s2.window_start;
Interval Join
Suppose you are processing user clickstream data and want to join click events with purchase events. But these two events may not occur within strict window periods. In this case, an interval join would be more appropriate. It allows two events to join within a time interval instead of strict window periods. One may ask why not just use a regular symmetric hash join without interval constraints? However, in stream processing, managing the size of the join state is an objective. Interval joins leverage stream watermarks to efficiently clean up expired states, ensuring that they will no longer participate in any future joint operations. The syntax for interval join is as follows: it necessitates range constraints on the timestamp columns of both inputs in the join condition.
-- *Create a materialized view using interval join*
CREATE MATERIALIZED VIEW interval_join AS
SELECT s1.id AS id1,
s1.value AS value1,
s2.id AS id2,
s2.value AS value2
FROM s1 JOIN s2
ON s1.id = s2.id and s1.ts between s2.ts and s2.ts + INTERVAL '1' MINUTE;
Temporal Join
The joins mentioned above treat both inputs equally, resulting in a high cost due to the need to maintain hash tables for both inputs as a state. Traditional database hash joins only build a hash table on one side. To improve performance, one idea is to break the symmetric relationship between inputs. RisingWave offers temporal join, which can join one stream against a table or materialized view. The streamside maintains no state. When data comes in, it can directly query the table side, and the table serves as the join state itself. We can see in this model, temporal join itself maintains no state, so its efficiency can be very high. But there is no free lunch - any data changes in the table side do not affect previous join outputs like symmetric hash join. Also, to ensure consistency, the stream side must be append-only, meaning only inserts, no updates or deletes. Below is the syntax for temporal join, with a special FOR SYSTEM_TIME AS OF PROCETIME()
clause. This handles the consistency between batch and streaming query results for the same query in RisingWave. But temporal join means the same semantics cannot be expressed in regular SQL, hence the special syntax.
-- Create a table
CREATE table t1 (
id int primary key,
value int
);
-- Create a materialized view using temporal join
CREATE MATERIALIZED VIEW interval_join AS
SELECT s1.id AS id1,
s1.value AS value1,
t1.id AS id2,
t1.value AS value2
FROM s1 JOIN t1 FOR SYSTEM_TIME AS OF PROCTIME()
ON s1.id = t1.id;
Join Ordering
Join ordering is a crucial aspect when discussing joins. Extensive research has been conducted in traditional databases on optimizing join ordering. One key idea is to use a cost-based optimizer (CBO) to enumerate the plan search space, estimate data volumes for each operator using statistics, calculate total plan cost, and pick the plan with the lowest cost. But in streaming databases, join inputs are continuous real-time streams with unknown data volumes and distributions. Some research papers propose rate-based join ordering[2], mainly estimating output stream rates after joins using input stream rates and finding the join order that maximizes average output rate.
If you are interested, you can delve into the paper, but here I’d like to offer my conclusions. The findings suggest that the cost of directly and accurately calculating the average output rate is relatively high. Therefore, a heuristic algorithm is needed to strike a balance between achieving desired results and optimizing speed. Placing faster streams closer to the join output, farther away, or in the middle can all be optimal depending on the shape of the join tree. An intriguing observation is that bushy trees appear to be better suited for stream processing compared to left-deep trees. Just by shape analysis, we can see that bushy trees allow data to flow parallelly through the entire join tree, and compared to left-deep trees, each data row joins from the bottom up with lower latency. We also did some experiments that validated that bushy trees can better utilize resources, increasing throughput and reducing latency. Therefore, RisingWave's current join ordering algorithm tries to make the tree as bushy as possible while minimizing tree height.
Finally, I want to mention that not all joins can be converted to bushy trees. In many cases, the SQL queries from users are just many left joins chained into a left-deep tree shape, which cannot be optimized into a bushy tree. In these cases constrained to a left-deep tree, a better approach is to place faster streams closer to the output.
More
Subqueries
The joins mentioned above are mostly inner and outer joins. RisingWave join implementation also supports semi-joins and anti-joins, but they cannot be directly expressed in join syntax and need to be written as correlated subqueries. RisingWave's subquery unnesting follows the paper Unnesting Arbitrary Queries[3]. All subqueries are converted to apply operators, and apply operators are pushed down until all correlations are rewritten to normal references and can be converted to joins. So in RisingWave, subqueries are also considered as a form of join.
Nested Loop Joins
The joins discussed all have equijoin conditions, but for joins without any non-equi conditions, we clearly cannot use symmetric hash join. Currently, nested loop joins are prohibited in RisingWave streaming queries because they are extremely inefficient in stream processing - any change on either side requires comparing against all data on the other side to compute incremental results.
However, some special types of non-equi join conditions can actually be quite efficient. RisingWave has a special operator called dynamic filter, originally proposed to handle nested loop join scenarios caused by non-correlated subqueries like in TPCH Q11. The special thing is one side of the join has just one continuously changing row from a non-grouped aggregation, and the join condition is a range condition. In this case, changes on both sides can be handled efficiently via a relatively small-range query to achieve incremental computation.
Delta Join
From the above, we know join is a stateful operator with its own join state to maintain. If multiple queries use the same table input with the same join keys, we can reuse its state. We have discussed this topic in a previous article, Shared Indexes and Joins in Streaming Databases, so we won't go over it again here.
But some are not implemented yet…
Multi-Way Joins
The joins discussed are assumed to be binary - two inputs per join operator. For N-way joins we would need N-1 join operators, while multi-way join can handle multiple inputs in one operator. In traditional databases, binary joins actually work very well for most scenarios, leaving multi-way joins to handle some very specific cases like intermediate result magnification with binary joins[4]. The advantage of multi-way joins in streaming would be reduced join latency - results that previously spanned multiple operators can now be completed in one multi-way operator. However, multi-way joins don't have streaming friendly scaling approaches - either all streams converge on one node, or N-1 streams are broadcasted to the hashed stream, or more complex hypercube shuffles are introduced.
Fast and Slow Streams
Stream data has varying speeds - if one input stream is much faster than the other, can we take advantage of this asymmetry in the join state data structures? Some papers[5] look at only indexing the slower side with hash tables when joining fast and slow streams.
References
Xie J, Yang J. A survey of join processing in data streams[J]. Data Streams: Models and Algorithms, 2007: 209-236.
Viglas S D, Naughton J F. Rate-based query optimization for streaming information sources[C]//Proceedings of the 2002 ACM SIGMOD international conference on Management of data. 2002: 37-48.
Neumann T, Kemper A. Unnesting arbitrary queries[J]. Datenbanksysteme für Business, Technologie und Web (BTW 2015), 2015.
Freitag M, Bandle M, Schmidt T, et al. Adopting worst-case optimal joins in relational database systems[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 1891-1904.
Kang J, Naughton J F, Viglas S D. Evaluating window joins over unbounded streams[C]//Proceedings 19th International Conference on Data Engineering (Cat. No. 03CH37405). IEEE, 2003: 341-352.
CONCLUSION
RisingWave is a cloud-native SQL streaming database that has done extensive work on state management, reuse, and performance optimizations for stream processing joins. This article introduced RisingWave join use cases, basic stream join principles and join state characteristics. It also covered using watermarks to constrain join state size. RisingWave provides symmetric hash join, interval join, temporal join, delta join and other join features. RisingWave has implemented join ordering optimization tailored for streaming scenarios, comprehensive subquery unnesting to convert subqueries to joins, and dynamic filter optimization for special nested loop join cases.