Co-authored by Sumit Kumar
At Meesho, we help lakhs of entrepreneurs in Bharat sell their items online. In numbers, we have over 50 Mn products (and growing) in our catalog with more being added every day. New items arrive, prices change, and products go out of stock.
To track and display all these changes in real-time and ensure that the information in search results and user feeds stays accurate, we have to index all this data. However, any delay or errors during indexing mean the information in the search results and user feeds will be out of date.
Often, this means that by the time a user adds an item to the cart, the item’s price might have changed or already be out of stock. When this happens, users often get frustrated and abandon their carts, or worse, sometimes stop shopping at Meesho altogether — all of this is Bad For Business™️.
To provide a better experience for our users, we need fast, reliable, and scalable indexing. In this series of articles, we’ll describe in detail how we started off indexing products, the hurdles we faced while scaling the old architecture, and how we implemented a horizontally scalable indexing architecture using Apache Flink.
How we started: scheduler-based indexing
Around 3 years back, we chose crontab — a tried and tested tool to schedule our indexing jobs. At the time, it was a good solution because the number of products was low, and it was relatively quick and simple to implement. The results were satisfactory, too — it usually took 5-7 minutes to index all the products, and we scheduled the indexing job to run every 10 minutes. Life was good.
Issues with scheduler-based indexing
This solution worked for a while, however, over time, the number of sellers on Meesho increased rapidly, and with it, the number of products — now in the tens of lakhs. The legacy components inherent to scheduler-based solutions couldn’t keep up with this enormous increase in scale, and the run time for each job ballooned to 4 hours!
This increased run time resulted in data lag, leading to users sometimes seeing old prices or out of stock items in their feed — something we wanted to avoid as much as possible. Such lag was especially noticeable during monthly and festival sales.
Some other issues with this approach:
- High indexing rate led to constant disk writes and greatly increased the CPU usage on the ES (Elasticsearch) cluster.
- Resource intensive: increased load on downstream databases and services for all products.
Downstream services being called constantly at very high TPS (transactions per second)
- Overlapping runs recomputed all product data whether an update happened or not, adding to the already high CPU and disk usage.
- In case of a failed job, we needed to wait for the next completed run for the new index as we couldn’t use a partial index. Depending on the point of time at which the job failed, this caused anywhere from 4 to 8 hours of delay.
- Periodically deleting old indices in ES (Elasticsearch) resulted in frequent garbage collection cycles.
Interim solution: Kafka consumers
While we wanted to implement a permanent solution, such a large structural change requires a lot of time and planning. Since the database and traffic kept growing, we needed a quick interim fix to keep up. To that end, we made two changes:
- Parallellised writes to Elasticsearch using Kafka consumers for better horizontal scalability: this sped up the indexing process (though at the cost of higher CPU usage) — our ES write time went from ~2 hours to 15 minutes, while reducing overall indexing time to around 3 hours.
- Removed percentile-based logic to allow for batch processing: this reduced memory consumption since catalogs were sent directly to Kafka and indexed immediately.
- Moved from percentile-based indexing logic to absolute value-based: as computing values that way wasn’t very scalable — a change in one item required reindexing the entire product catalogue.
Flowchart of our interim solution
However, not everything was rosy — we were still computing and generating every catalog from scratch instead of doing incremental changes. Within a month, further increases in product numbers caused the indexing time to increase to 4 hours, bringing us back to square one.
Requirements for the solution
We needed a low-latency, fault-tolerant, and horizontally scalable solution capable of:
- Automatically handling lag through back-pressure to avoid overloading the database and downstream services in the data flow pipeline.
- Decoupling reads and writes by separating the indexing process and enrichment pipeline.
- Providing an extensible, future-proof plug-and-play architecture for our data flow. We needed to support multi-consumer queues which would write into (our beloved) Elasticsearch.
- Processing events in seconds rather than hours that our old scheduler-based approach managed.
Data flow model requirements
Additionally, we had some data flow-specific requirements. Therefore, we needed a solution that could:
- Consume a variety of events such as inventory updates, pricing updates, new products, etc. to give our users the latest information.
- Treat some events as refresh events, while others could work with updating the existing product data with a delta rather than write a new one every time for better performance.
- Set up Change Data Capture (CDC) events on relational tables where application events were unavailable using Debezium.
- Update out-of-stock items as quickly as possible.
- Provide performance optimizations such as bulk API calls instead of making a network request on each update.
- Provide controllable output for all the data flowing into our DBs.
- Create re-usable, future-proof data streams that can branch out and work with data pipeline operations such as map-reduce.
- Provide windowing capabilities across events based on configurable parameters such as time and counts.
- Prioritise and group event streams in the form of priority queues — we don’t want to show out-of-date information on the app, but it’s okay if some updates are processed a little slower.
Our solution: scalable enrichment pipelines using Apache Flink
We evaluated and compared many potential solutions and ultimately settled on Apache Flink. Flink provides simple high-level APIs such as Map-reduce, Filters, Window, GroupBy, Sort, and Joins. It also provides high-availability using Zookeeper for primary and standby coordination.
Our planned Flink-based realtime indexing architecture
Here’s a comparison of the frameworks that we evaluated:
Name | Record acks (Storm) | Micro-batching (Spark, Streaming, Trident) | Transactional updates (Google Cloud Dataflow) | Distributed snapshots (Flink) |
---|---|---|---|---|
Guarantee | At least once | Exactly once | Exactly once | Exactly once |
Latency | Very low | High | Low (transaction delay) | Very low |
Throughput | Low | High | Medium to high | High |
Computation model | Streaming | Micro-batching | Streaming | Streaming |
Overhead of fault tolerance mechanism | High | Low | Depends on throughput of distributed transactional store | Low |
Flow control | Problematic | Problematic | Natural | Natural |
Separation of application logic from fault tolerance | Partial | No | Yes | Yes |
For our use case, Flink ticked all the boxes that most other frameworks didn’t — low latency, high throughput, low overhead of fault tolerance, and natural flow control. As a bonus, Flink has a large and active community, and new features, performance, and bug fixes are released regularly.
The numbers, compared
Of course, what looks good on paper isn’t always great in practice — we needed concrete numbers. Therefore, we obtained benchmarks for throughput numbers (source) for Flink and Storm. Here’s what we found:
Streaming architecture | Average throughput (Mn/sec) @ 30 nodes, 120 cores |
---|---|
Flink, no fault tolerance (0 ms latency @ p99) | 182 |
Flink, 5 sec checkpoints (0 ms latency p99) | 178 |
Storm, no fault tolerance (11 ms latency @ p99) | 9.8 |
Storm, fault tolerance active (30-120 ms @ p99) | 0.57 |
Storm with Trident, fault tolerance active (3000 ms @ p99) | 9 |
Some highlights:
- For simple, in parallel tasks without network shuffling or message grouping, Flink achieved over 300 times higher throughput than Storm and ~20 times higher throughput than Storm with the Trident extension.
- For tasks without the above situations, Flink achieved ~250 times higher throughput with checkpointing and ~280 times higher throughput without checkpointing than Storm.
With these benchmarks in hand, we set out to integrate Flink into our architecture. In the upcoming articles in this series, we’ll look at how we implemented our solution using Flink, what we gained, and the problems we faced during the process, so stay tuned!
In the meantime, if you like what you’ve read so far, make sure to check out our careers page and join our team! Together, we can build solutions to democratise e-commerce in Bharat for the next billion users.