Why We Ditched Flink Table API Joins: Cutting State by 75% with DataStream Unions

The beauty of a high-level abstraction is that it lets you focus on the "what" rather than the "how." In the world of Apache Flink, the Table API is a powerful tool that abstracts away the complexities of stream processing, allowing developers to write SQL-like queries on streaming data. However, as we discovered in our journey with Flink, there are scenarios where the Table API's abstraction can be too heavy.

photo of Maryna Kryvko
Maryna Kryvko

Senior Software Engineer

Posted on Mar 04, 2026

Photo by Włodzimierz Jaworski on Unsplash

The beauty of a high-level abstraction is that it lets you focus on the "what" rather than the "how." In the world of Apache Flink, the Table API and SQL represent this convenience very well. You write a simple join statement, and the query optimizer handles the heavy lifting. It feels like magic - until that magic starts costing you thousands of dollars in AWS bills and crashing your clusters every time a snapshot is triggered.

This is exactly what we faced with our Product Offer Enrichment applications at Zalando. What began as an elegant, declarative solution eventually started crumbling under the weight of its own state. By moving from the "magic" of SQL to the manual control of the DataStream API and a custom MultiStreamJoinProcessor we managed to decrease our state size from 240GB to 56GB, a 75% improvement.

Here is the deep dive into why Flink SQL state accumulates, how joins actually work under the hood, and how we rewrote our job to save the applications.

Disclaimer: this article is about Flink 1.20, which is the only version of Flink currently (Feb 2026) available on AWS Managed Flink.

The Initial Architecture: The Attraction of SQL

Our Product Offer Enrichment pipeline is a critical piece of the Zalando Search and Browse ecosystem. It is responsible for joining multiple streams of differing speed and "weight", including data about pricing and stock offers from partners, sorting metadata we call Boost, sponsored products metadata, and product data - to create a unified, enriched view of what a customer sees on the site when browsing a catalog of articles.

Catalog of articles

Figure 1: Catalog of articles on the Zalando website, fed by the Offer Enrichment pipeline.

Initially, we used the Table API & SQL. It allowed us to express complex joins in a few lines of SQL code. However, to understand why it failed, we have to look at how Flink SQL manages stateful joins.

Why State Amplifies

In Flink 1.20, each join operator is a strictly independent unit. Because Flink must account for late-arrival data and potential updates, it must maintain data integrity by keeping every record in its internal state (RocksDB). When you chain four joins together, you aren't just adding state; you are multiplying it. Each join operator in the chain maintains its own copy of the keys and values it needs.

The State Math

  1. Join Operator 1 (offer + boost): Flink stores all records from offer and boost in the RocksDB.
  2. Join Operator 2 (operator 1 + sponsored): To this operator, the incoming joined record is just a new stream. It has no access to the previous operator's memory. It must store its own copy of the (offer+boost) data to join it with the sponsored metadata.
  3. Join Operator 3 (result of 2 + product event): It clones the previous results again.

The relational model treats these as isolated operations. The relational model treats these as isolated operations. This state amplification led us to a staggering 235–245GB of state per application.

The State Nightmare

When your state reaches 235GB of data, your application stops being a data pipeline and starts being an unstable nightmare.

Every hour, a cronjob would trigger a snapshot (savepoint). For us, it was a catastrophe:

  • CPU exhaustion: To snapshot 235GB, Flink must iterate over the RocksDB state, serialize it, and move it to S3. This would keep the cluster's CPU at 100% for nearly 12 minutes.

CPU spikes

Figure 2: CPU spikes during snapshot creation
  • Backpressure: Because the application was running close to the CPU limit, it couldn't process records. The lag would start getting higher and higher.
  • Crash-Restart Loop: Often, the Flink application would simply give up and restart. Because Flink restarts involve reloading the state from S3, we would sometimes fall behind our 1-hour SLA. By the time the app was back up, it would be almost time for the next snapshot. We tried to lengthen the snapshot job intervals to several hours, but that would again create a setup for the SLO breach in case of the application failure that would require a restore from the snapshot.

Flink restarts due to snapshots

Figure 3: Flink restarts due to snapshots
  • Snapshot Failures: Due to forced restarts, many snapshots just couldn't be taken. This was again making us vulnerable because of unreliable data backups.
  • Overscaling: Every scaling operation on a Flink application involves a full job restart, because the job parallelism needs to be reconfigured, and it can't happen on a live application. Since the STOP operation involves creating a snapshot (this is a configurable setting in AWS Managed Flink that we had enabled), every scaling was taking time proportional to the snapshot creation… that is, 11–12, sometimes up to 20 minutes. Because of that, the parallelism for the application was constantly kept at 10–20% higher than normally required, to provide some margin for the intake spikes and make sure the restarts don't happen too often. They were just costing us too much lag!
  • Cost: Since the AWS Managed Flink costs are proportional to the number of KPUs, that is, the job parallelism, we were paying for the huge state with very real, physical money.

Note: Managed Service for Apache Flink provisions capacity as KPUs. A single KPU provides you with 1 vCPU and 4GB of memory. For every KPU allocated, 50GB of running application storage is also provided. This means that the application resources are always configured in terms of KPUs, there's no way to allocate more storage without also allocating more CPU and memory, or more memory without also allocating more CPU and storage.

Failed snapshots

Figure 4: Failed automated snapshots

From Table API to Stream API

In the end, we made a decision to move from the Table API (declarative) to the DataStream API (imperative). This approach is very different from a simple SQL statement, but it gives way more control over what's happening.

The MultiStreamJoinProcessor

We moved to a Stream Union approach. Instead of chaining joins, we unified all incoming streams into a single DataStream[BaseEvent]. This allowed us to replace the chain of joins with increasing state with a single KeyedProcessFunction-specifically, our custom MultiStreamJoinProcessor. The key in this case is the SKU - the product identifier, which is the common key across all streams.

Exact State Management

In this model, there is only one instance of the SKU data in RocksDB. We use a single ValueState for a custom Scala POJO called EnrichmentState. This state management approach has several advantages:

  • No multiplication: There is no left or right part of the join. When an event arrives, it simply updates the specific field(s) in the existing ValueState object.
  • No TTLs: We keep the state "forever" to always have the last known value for an SKU. However, because we only store it once, the state is significantly smaller.
  • Filtering/deduplication built-in: We implemented manual stream filtering. If an incoming update has a timestamp that is earlier than what we already have, we drop it immediately, avoiding a state write altogether. In some cases, it would also be an option to compare the incoming events with already kept, using a relevant subset of fields to prevent updates from events that don't contain relevant changes.

DataStream code example

This is obviously not the real code; just a much simplified example to illustrate the options. The code is written in Scala.

// 1. Define a unified, dense POJO, containing only the fields we need for the enrichment, and a single state object to keep it in.

case class EnrichmentState(
  var price: Double = 0.0,
  var stock: Double = 0.0,
  var sortingScore: Double = 0.0,
  var productState: String = null
)

// 2. The Processor Logic
// Type parameters are: key type, input type, return type
class MultiStreamJoinProcessor extends KeyedProcessFunction[String, BaseEvent, EnrichedOffer] {
  private var state: ValueState[EnrichmentState] = _
  override def open(parameters: Configuration): Unit = {
    state = getRuntimeContext.getState(new ValueStateDescriptor("enriched-state", classOf[EnrichmentState]))
  }

  override def processElement(event: BaseEvent, ctx: Context, out: Collector[EnrichedOffer]): Unit = {
    val current = Option(state.value()).getOrElse(EnrichmentState())
    event match {
      case o: OfferEvent =>
        // Deduplication possible: Only update if price or stock changed
        if (s.price == current.price && s.stock == current.stock) return
        current.price = o.price
        current.stock = o.stock
      case p: ProductEvent =>
        // Deduplication possible: Only update if product state changed
        if (s.productState == current.productState) return
        current.productState = p.productState
      case b: BoostEvent =>
        // Built-in filtering: Only update if boost value is newer than the one we have
        if (b.timestamp <= current.boostTimestamp) return
        current.sortingScore = b.sortingScore
    }
    state.update(current)
    out.collect(EnrichedOffer(ctx.getCurrentKey, current))
  }
}

Funnily enough, the "more manual" approach turned out to be even less verbose than the SQL version, because our SQL was quite complex, with aggregations for calculating the maximal timestamps between several parts of the join and with ranking functions for making sure the last record from the same part of the join always wins.

Result: 75% State Decrease

The results were immediate and impressive. We decreased the number of operators; we decreased the state to 1/4 of the previous value. The snapshot size and duration dropped proportionally. The restart time also dropped to 4–5 minutes from 12 to 20 minutes, so the lag, while still being accumulated at restarts, wasn't threatening our SLO any longer.

MetricTable API (SQL)DataStream APIImprovement
State Size235GB56GB-76%
Snapshot Duration11 Minutes2.5 Minutes-77%
CPU Usage100% (Spikes)~30% (Stable)Stability
AWS CostsBaseline13% ReductionSavings

So how come we didn't cut the cost by 75% as well? Because the AWS costs are not so much about the state size, but more related to the CPU and memory resources. We did save some CPU capacity, but the memory usage didn't change much, because we still needed to keep the same amount of data in memory for processing. The CPU usage was more stable, but it wasn't reduced by 75% because the processing logic still had to do the same amount of work on the same throughput. It just didn't have to deal with the overhead of managing multiple states.

Because of that, what we saved was mostly the cost of the previous overscale. The applications were running at 10–20% higher parallelism than needed, so we could reduce that and save some money, but the cost reduction was not directly proportional to the state reduction. Still, as Flink optimizations go, a 13% cost reduction is a very good result, especially considering that it also made the applications more stable and reliable.

What Comes in Flink 2.x

As already mentioned above, we are currently on Flink 1.20 because that is the only option on AWS Managed Flink. So one might say, oh, but Flink 2.x has this and that, and you probably wouldn't have to do all this work, and maybe it was all for nothing.

This would happen to be true, because the Flink community was very much aware of the issue, and there was an improvement proposal dated May 19, 2025, called Multi-Way Join Operator. This was then introduced in Flink 2.1 as an experimental feature.

Disclaimer in the Flink documentation: "This is currently in an experimental state - there are open optimizations and breaking changes might be implemented in this version. We currently support only streaming INNER/LEFT joins. Support for RIGHT joins will be added soon."

The MultiJoin improvement still wouldn't save us because it's a much later version of Flink than we're on, but it is interesting to see that we reimplemented the same idea: keyed state, one operator for all streams, no intermediate state. The benchmark for the Flink 2.1 implementation shows impressive results: 2x to over 100x+ increase in processed records; 3x to over 1000x+ smaller state. I guess this feature alone will be worth the wait when we get there, but until then, we're covered by our home-baked solution.

When to Get Down

By making the state management explicit, we made the overloaded, barely coping system into a reliable, high-performance machine.

Flink SQL is perfect for 90% of use cases - it's fast, elegant, and maintainable. But a software engineer's value is in recognizing the remaining 10%: the use cases where the abstraction starts costing too much.

And this was definitely one of those.


We're hiring! Do you like working in an ever evolving organization such as Zalando? Consider joining our teams as a Software Engineer!



Related posts