2025-06-04 | DjangoCon Europe 2025 | Bulletproof Data Pipelines: Django, Celery, and the Power of Idempotency

构建可靠数据管道:Django与Celery中的幂等性实践

媒体详情

上传日期
2025-06-21 18:06
来源
https://www.youtube.com/watch?v=fjcNUE_7meU
处理状态
已完成
转录状态
已完成
Latest LLM Model
gemini-2.5-pro

转录

下载为TXT
speaker 1: Hey everyone, I'm Ricardo and it's okay. There are many different pronunciations for this word. There's like potency epotency, a lot of different pronunciations. Before we start, just a quick question over by in chance any of you here on last year's jungocon? Because there was a talk by Jake Howard, if I'm not wrong, talking about salary and workers, background workers, and was a terrific talk. Very good. I truly recommend you watch. If it didn't, it's available on YouTube. And I'm mentioning this because I believe that this talk is like a spiritual follow up from this because we will be talking about salary, but could be any kind of messaging cuue system actually and how to make it resilient enough, but it's still managing complexity so it doesn't explode our faces. But okay. Before we start, as I said, as we said, I'm Ricardo. I' M Born and raising reciof, Brazil. It's a northeast part of Brazil. I spent some time in Porto here in Europe, in Portugal. I'm a software engineer at vinta software, which is a company focused on projects and software projects, basically fully focused on Python and everything jungle related. And I'm constantly talking about architecture, both reward and Yeah software architecture. Maybe because my channel says she's an architect, like a reaward architect that butes things. So I just like to talk about it. And this whole talk is just an excuse for me to talk about architecture for like 30 minutes. But okay, what is this? What is this talk? Actually, this talk is a thought experiment is not. It's based on a real world case scenario that happened to me and my team. So we'll be following some strategies that we use and explaining how we do this. But most importantly, it is a just a thought experiment for better understanding parallelization and software architecture and also how we can refactor things in an organic way. So when you start with a piece of code that is just bad, I think it's just like a method that breaks every so often and isn't very resilient. And we will iterate through it and making the bit better just bit by bit until in the end of the talk, we'll have something somewhat okay, but with still room for improvement. And welcome to jungojungovids is a video platform aimed for the Jungo community, where everyone can upload their tutorials, maybe their videos. It's a new platform with hundreds of users, new ones coming every day and thousands of videos. It is a very strong community and new uploads are frequent in the platform. They occurring every minute. For each video, we will run an enhancing pipeline where the video will be like every frame of the video will have better sharpness, better contrast, better brightness, something like that. And the jungle vids team is saying that scaling is hard. The pipeline, this video enhancement pipeline is getting out of hand, is taking too much time to actually do the things, and scaling is hard. And that's normal, right? I mean, we all have problems with scaling. We have a lot of different talks about how we can better manage I don't know, database es, how we can better manage our software, which tools we can do to scale better. But is it really hard? Here is a Google Trends picture. And by no means this is like a scientific researcher or something like this, but this picture shows that searches for architecture and software architecture are decreasing in Google since 2004, maybe because we already have like resources to handle those scenarios and those kinds of problems. Also have several books and articles published in this topic. Thousands of large scale odistributed systems are operating. Word, vitamin, Amazon, Netflix, Facebook, Uber, they all handled this constantly in every single second. They have a lot of different pipelines. Everything is running smoothly for them. Why not for jungovid? That's what we'll be looking at right now. Let's take a look at the video upload pipeline. And when I say pipeline, I apologize in advance for any data scientist or data engineer. I'm just simply talking about a pipeline as a multi step process. So as we can see, once we upload the video, the raw video data goes through the first step, which is preprocessing, where we get the metadata, metadata, we decode something, I don't know, something like that. Then it goes through the video enhancement process, which is the most important and is the bottleneck of this whole operation. Then it goes through a thumbnageneration step and then a speech attcaption creation, and it outputs enhanced video that will be available on the platform. And as I said, this video enhancement is the bottleneck right now. And why is this? This bomb code is a lot of code, but it's a simple code. And this is the thing that I said, it's just somewhat bad code. So for each frame in the video, we're making an api request, we're checking for the status code. If the status code it is isn't okay, we just explode everything, raising an exception. But if it is okay, we enhance the frame, save the frame and go to the next one. I mean, there are a lot of problems here, just a lot of problems, but you'll not go through them right now. We just take a bit, just a step to make a bit better. But before this visualization may help to understand. So we start the enhancement, we iterate through each frame of the video. And for example, if we have a one hour long video with 30 fps frames per second, it's more than 100, zero frames that will be iterating through for just one video. We'll make one apo for each frame. We check this api response. We actually don't handle the failure. We just explode everything and then we save the frame if it's okay. And this exceptional handling is a volcano about to erupt. So this is the first pain point that we can see in this in this whole process. We do not have any type of air handling. We have a fully synchronous api. So we go through each frame one by one, taking a lot of processing time because remember, we do have to make an api request for each frame. So let's make it better just a bit. And as I said, this is going to be like an organic refactor, more like the most step to a better outcome. And the only change that it will be making is put in a tracat book. This isn't like some magic or some extra fancy thing, but it does handles one of our pain points. So now we can ensure that at least we'll try to enhance every frame before. If frame number two failed for some reason, the whole thing would explode and we're going to go through frame number three, four and so on. Now at least we try to go through them, but it does it does still have a lot of pain points. How can we retry those failed enhancements? How can we paralyze those enhancements and make them, I don't know, maybe faster? And here is another visualization of this. So we start hancement, iterate through the frames, attempt now to do the enhancement of the frame. We actually handled the enhancement there now and then continue to the next frame. As I said, it is in a lot of refactoring right now, but it is just a bit better. And this is the whole idea of what we're doing here, is to keep evolving bit by bit, pr after pr to make ja little bit better. How can we keep improving this? We can handle api and availability. We can retry field enhancements, retry some frame that for, I don't know, maybe the api was not available for like two hours. And we had a lot of frames failing to enhance. How can we try those? How can we parallelize those enhancements? And for this specific pain point, we can use salary. So salary, and it could be like our key could be actually any messaging system will help us make it a sync, which is another small step for maybe more resilient implementation. Salary. For those who don't know, it's a very popular package in the Python community and in the Jago community as well. It's a distributed test queue. It's highly available. It's horizontally scalable, which will be very important for us. And it is a reliable distributed system to process vast amounts of messages, just like our user case. Because as I said, if you have one hour long video, we'll be handling more than a thousand messages for this video alone. So this is the next step we can do to improve the enhancing algorithm. Now instead of actually going through each frame and trying to enhance the task, now just enhancing the frame, now we just call a task and key a lot of different tasks, one for each frame to try to enhance it. Celary already has an availability and auto retries configurations. As per my experience and what we had on my team, it isn't so intuitive know to do things on salthose kinds of things in salary. So it is still a bit complex. And also, how can we pinpoint why the frames failed? How can we improve this and have better observability without the need of an extra tool, for example, without the need of data dog, new relic or something like this? How can we with a jungle query, just get this from our database? This is a balance between the benefits and challenges from salary. So it's very good that now we have a async process and we can have the alto retry capability, but we don't have a lot of visibility. It's a bit complex to manage workflows in salary. And also the air handling isn't great. That's where the managers, important executions or either important executions enters the room now will unlock the power using junggo's orm. So it will be as simple as a module. It will be as simple as doing just a query on a ternode to get the data from this model. And why need important executions? Basically from this excerpt from Wikipedia, need important execution is anything that can be applied multiple times without changing the result beyond the initial application. So imagine we have a frame that we want to enhance. If this frame, if we enhance this frame once, we have like better sharpness, better noise reduction and etcetera. If we enhance this frame again, we may have even better sharpness, even better noise reduction. I don't know if you have any familiarity with this kind of api with enhancement photo enhancement apis, but basically, they always find something to make a bit better. And so it isn't natively inimportant. If we keep making the api request, if we keep reaching out to the api, it will keep enhancing and enhancing and enhancing. And sometimes it's not even you can even perceive a difference, but there is a difference. So how can we ensure that we not be trying to enhance frames that were already enhanced, skipping unnecessary api requests that will save us time and also money when we don't reach out to this expense for api? Here we entered the executions realm. Maybe basically you have a frame enhancement execution, which is just a jungle model. It's just a jungle model that has a manager. We'll talk about it later on. It has a frame, the frame from the video you want to enhance and has a status. These status will be very important for us to have granularity over this, the status of each frame. So we'll know which frame failed, which frame is running. So it's being under the process of enhancement, which frames were not enhanced and so on. So itbe very important once we have like parallezed tasks running in parallel with a lot of distributed workers, and it will be very easy to pinpoint which frame are failing, which frame are are painting to be enhanced. Taking a look now at the manager, the venhancement manager, who have a list of frames, which we saw in the last slide, is as simple as a jungle model can be. It's just a model that has a foreign key to video. So we can keep the status, the track kly status of the video enhancement and also has a status so we can we will know if the manager has started, if the manager is running, how many frames of the video are being enhanced right now and etcec. And here is our updated algorithm. So basically, we create the manager, we create all the executions for this manager based on the video frames, and we once again delay the task. So that's not a much different from what we are doing right now. But now with this kind of implementation, we can have, for example, a sidecar in the background running just for the failed frames. We can keep retrying those frames every 5s, every 10s, every minute. We have a lot of room for improvement here that we didn't have previously. So if we take a look a step back and take a look at what we've done here, we came from a simple method that was fully synchronous, broke every so often and wasn't reliable. Now we still have a method that it can break. It may not be fully resilient, fully reliable, but now we can understand which frames were broken, which frames we couldn't enhance for, I don't know, some reason, maybe api availability or something like this. We can retry those frames without the need of running the for loop again for the whole video, trying to enhance again frames that were already enhanced. So when we think that each enhancement here is an api call that we do, every frame that we skip on our for loop is one less api code we're doing and zooming in right now on the task itself. We basically just select the execution, get the frame from the execution. We still now we have just a status check. This is where the idiposy enters. As I said before, enhancing a frame isn't natively independent, potent. We always try to enhance even if it has been already enhanced. So it's just a status strike. Again, it's not something fancy. It's not like a finished state machine or something like this. It's just a status task, status check. If the enhancement was already completed, we'll just skip this frame. There is no point of enhancing it again. So so we do this this check, try to enhance the frame. If the video enhancement throws any type of exception, we get this exception. We save the execution as failed and we move on. We just go through to the next frame that is running in parallel. So now we have a lot of workers in parallel. We can have failed frames as we will, because we don't need to retry every frame, just the failed ones. We can have a sidecar. We can have another design pattern to you to just get those frames from a simple database query. You can just square frames that failed for some reason and retry those frames. So it's again, just a bit better than was before. And there's always room for improvement. There's always something better that we can do also here in the top where we get the execution, the framing enhancement execution, we do a select for update because we are now working parallel. And sometimes when you have a lot of different workers, you may have a worker that is picking up frame a and other worker is picking up the same frame. So this will create a race condition. We don't want this. It's a concern that it's a more complex approach than before, but it's also more resilient approach. So this is why you need this select for update line. And this is now our algorithm, our process funnel. So we create the manager. The manager has several executions, one for each frame of the video. We delay several tasks, one for each frame of the video. And we keep checking those status because we can run the enhancement process for the video. Once maybe it will fail, then we can run the process again. And we just pick those that have failed and do not try to enhance those frames that were already enhanced. And if everything goes as expected, we have a enhanced frame at the end. And a question is, can we keep improving this? As I said, this is the mantra. The whole point of this whole talk is there's always room for improvement. There's always something we can do to be a bit better, some step we can make to go in a different direction so we could save hermetadata in the failed executions, for example, we could know why this enhancement failed. Maybe just a json field in the database. Bnf post quiz has a json field for this. MySQL has as well. So maybe this is most other can do to understand why what we can make better for the api request, for example, what we can make better, what we can do to be better, to be more resilient. Also, we can implement it in other parts of the pipeline. As we saw in the beginning, djgovids has a multiti step pipeline. This is only one piece of it. So how can we use this for other steps? How can we use this same strategy for other parts of the pipeline? Can we use it in my team? We do use it. It's something that is working for us, especially when we use the chain of responsibility where it's a principle that we have a step that starts and runs and ends, and then when this step ends, we start the next step and so on. So the whole pipeline could be just a big chain of responsibility, where in the first step, the metadata, we run a lot of different workers to collect different metadata types from the video. Then when all these workers are successful, we go to the next one, which is a veenhancement, this one that we just saw. Then you go to the next one, to the next one, to the next one, and is actually just something really simple to implement when you get the grasp of it. And again, it's just as simple as a jungle model can be. And finally, what can you get from this experiment? Bad architecture design may actually will cause degraded performance. We haven't changed a lot in the architecture of the api, a lot of the architecture of the code that we are dealing with. We just basically did some exception handling and made it a syk. But Yeah, that's step by step, bit by bit, just making a bit better. We we could, for example, not make an api request for each frame because we are now having 100000 different requests for each video. So Yeah, this could be way better than all these that we did, but it could also be a big refactor that the team is not ready to do. So it's always easy trade off to understand you can what you cannot do right now properly scaling a simple pipeline as this should not be a pain. We just saw it's like one step than another than another, very simple steps. It should not take 30 minutes to run, should not take like any even 15 minutes to run. And with salary and with a sync workers, we can just run more workers in parallel. We can have ten, 15, 30, 100 workers running parallel, picking up frames and running and etcetera. Direct actors should always be evolving as more challenging scenarios appears. So we got the problem from the beginning. We keep evolving and evolving and evolving until we have someone, what, something resilient, but maybe not. It may be not optimal yet. Definitely it isn't optimal yet, but we can still improve it and we can keep improving it as more challenscenarios appear. For example, now we have 100 workers work in parallel. How does the api handles 100 requests come in parallel every second, every millisecond. It's a new challenge that we need to face it as a team. Syk workers are best friends when dealing with large amount of data, but they can also be our worst enemies when dealing with a large amount of amount of data, making, for example, an external api, third party api having a degraded performance because now, as I said, I have a thousand workers reaching for this api every second. We are just doing like a DDoS on this third party api. And it important is key when dealing with these scenarios even when artificially created as we did here because as we said, the enhancement isn't natively indepotent. But with a simple status check, we can make it somewhat important tent, make sure that we'll not be spending cpu cycles in api requests and that cetera is something that cannot be enhanced even further. And that's it.
speaker 2: Thank you. Thank you. Rifor the talk. So my question is, when you started doing the optimization, were you trying to save each frame? Details are framed into each row of the table. So like if you are having one gb of video and you said probably we are considering 30 frames per second. So were you having that many frames being recorded in the sequential database?
speaker 1: It's a great question for this example alone. Yeah, it happens like this, but this is, as the first point of the final thought said, just bad architecture. It should be just like this. It doesn't make sense to do something like this. But Yeah, it's something that we could improve for sure. Maybe it's just a urefactor that is way too big to do for the smoting, for example, and doing small steps to improve the pipeline could do could be a better approach maybe. But Yeah, I completely agree. It's just not a good architecture to begin with. Take care. So from your experience, which is the biggest salary pain point from your experience, something that grinds your gears, that's a great question. And I would say it is workclothhaving complex worklowith salary. It was the actual the seed that started all this movement ements from us to move away from having salary handling everything like workflows and chains and etc, and having something more jungle focused approach. So complex workflows with 11 task depending on the other. When task number three fails, what happens? Task number four or five is six. It's something quite complex to understand, at least in previous versions of salary that we working with. Exactly, exactly. Moving away from anything like complex workflows on salary, using salary just like a message cube and that's it. But controlling the state on database was our goal. Thank you, Ricardo. Thank everyone. Thank you.

最新摘要 (详细摘要)

生成于 2025-06-21 18:14

概览/核心摘要 (Executive Summary)

本次演讲由软件工程师 Ricardo Morato Rocha 主讲,以 "JungoVids" 视频平台为例,探讨了如何通过有机重构(organic refactoring),将一个脆弱、低效的数据处理管道,逐步改造成一个健壮、可扩展且具备幂等性的系统。演讲的核心是解决一个视频增强管道的扩展性问题,该管道最初为每个视频帧进行同步API调用,一旦失败则全盘崩溃。

演讲展示了一个清晰的演进过程,最终的核心解决方案是:引入 Django ORM 进行精细化的状态管理。通过创建 EnhancementManagerFrameEnhancementExecution 两个模型,为每个处理单元(视频帧)记录状态(如待处理、成功、失败),从而将复杂的状态逻辑从 Celery 中解耦,转移到数据库中。

这一方案实现了三大关键优势:
1. 人工幂等性 (Artificial Idempotency):通过在任务执行前检查状态,避免对已成功的帧进行重复处理,显著节省了API调用成本和计算资源。
2. 精准重试与高效率:能够轻松查询并仅重试失败的任务,避免了对整个数据集的无效遍历。对于一个包含10万帧的视频,这意味着可将重试成本从10万次API调用降至仅针对失败帧的调用。
3. 深度可观测性:无需依赖外部工具,仅通过数据库查询即可监控和诊断每个任务的详细状态,实现了对流程的完全控制。

最终结论强调,糟糕的架构是性能瓶颈的根源,但通过小步迭代的方式可以有效改进。即使处理逻辑本身不具备幂等性,也可以通过状态管理“人工创造”幂等性,这在处理大规模分布式任务时至关重要。


问题背景与演讲定位

演讲以一个面向 Django 社区的视频平台 "JungoVids" 为例,该平台因用户和视频数量的增长而面临扩展性挑战。

  • 核心流程:视频上传后,一个“增强管道 (enhancing pipeline)”会对视频的每一帧进行锐度、对比度等优化。
  • 主要痛点:该视频增强过程是性能瓶颈,处理时间过长且难以扩展。
  • 演讲定位:讲者明确指出,本次演讲是去年 DjangoCon 上 Jake Howard 关于 Celery 主题演讲的“精神续作 (spiritual follow up)”,旨在深入探讨如何构建更具弹性的后台任务系统。

管道的演进之路:从脆弱到健壮

讲者通过一个“思想实验”,展示了将问题代码逐步重构的四个阶段,其优缺点对比如下:

阶段 核心方法 优点 缺点与挑战
1. 脆弱的同步实现 单一循环,逐帧同步调用API。 实现简单,逻辑直接。 - 极其脆弱:单帧失败导致整个任务中断。
- 效率低下:完全串行,无法利用并发。
2. 基础错误处理 在循环内加入 try...except 块。 - 提升健壮性:单帧失败不再中断整个流程。 - 性能瓶颈依旧:仍然是同步处理。
- 无重试机制:无法方便地重试失败的帧。
3. 引入Celery异步处理 将每帧的处理封装成一个独立的 Celery 任务。 - 异步化:主流程快速响应。
- 高可扩展性:可通过增加 workers 水平扩展。
- 内置重试:Celery 提供基础的自动重试功能。
- 可观测性差:难以追踪单帧失败的具体原因。
- 工作流管理复杂:管理任务依赖关系不直观。
- 重试配置复杂:精细化的重试策略(如退避算法)配置繁琐。
4. 状态驱动的幂等性 Celery + Django ORM:用数据库模型追踪每帧状态。 - 人工幂等性:避免重复处理。
- 精准重试:只重试失败的任务。
- 卓越的可观测性:通过DB查询即可监控。
- 关注点分离:Celery负责执行,Django负责状态。
- 架构复杂度增加:引入了数据库状态管理层。
- 需要处理并发问题:如使用 select_for_update 防止竞态条件。

核心方案:基于Django ORM的状态驱动幂等性

这是演讲的核心解决方案,旨在利用 Django ORM 克服 Celery 在状态管理上的不足,实现对任务生命周期的完全控制。

  • 关键实现组件

    1. EnhancementManager 模型:管理整个视频的增强过程,记录整体任务状态。
    2. FrameEnhancementExecution 模型:这是核心,用于追踪每一帧的处理状态。关键字段包括:framestatus (如 PENDING, RUNNING, COMPLETED, FAILED),并可扩展一个 JSON 字段来存储错误元数据。
  • 新的工作流程

    1. 创建状态记录:任务开始时,为视频的每一帧创建一个初始状态为 PENDINGFrameEnhancementExecution 记录。
    2. 分发Celery任务:为每个待处理的记录分发一个 Celery 任务。
    3. 任务内部的幂等性逻辑
      • 锁定记录:使用 select_for_update() 锁定数据库行,防止多个 worker 同时处理同一帧,避免竞态条件。
      • 检查状态:执行操作前,检查记录的 status如果状态为 COMPLETED,则直接跳过,这是实现人工幂等性的关键。
      • 执行与更新:调用API进行增强。成功则更新状态为 COMPLETED;失败则更新为 FAILED,并记录错误信息。

核心结论与未来展望

  • 可继续优化的方向

    1. 在失败记录中保存详细的元数据(如异常堆栈)以简化调试。
    2. 将此模式推广到数据管道的其他步骤(如元数据提取、缩略图生成),形成一个“责任链 (Chain of Responsibility)”模式。
  • 核心结论

    1. 架构决定性能:糟糕的架构设计是性能问题的根本原因。
    2. 迭代式改进:面对大型重构,采取“一步一步,一点一点”的有机方式持续改进系统是务实有效的策略。
    3. 异步Worker的双面性:异步 workers 是处理海量数据的利器,但无节制的并发也可能压垮第三方API,形成事实上的DDoS攻击。
    4. 幂等性的关键作用:“在处理这些场景时,幂等性是关键,即使是像我们在这里所做的人工创建的幂等性。” 它能有效防止在分布式和重试场景下浪费资源。

问答环节 (Q&A) 摘要

  • 问题1:为每一帧在数据库创建一条记录是否高效?

    • Ricardo的回答:他承认这“从一开始就不是一个好的架构”。但演讲的重点是展示如何在一个已有的、不理想的系统上进行小步迭代改进,因为对于团队而言,进行一次彻底的大型重构往往并不可行。
  • 问题2:Celery 最大的痛点是什么?

    • Ricardo的回答:是处理“复杂的任务工作流 (complex workflows)”。这正是促使他的团队将状态控制逻辑从 Celery 转移到 Django 数据库的根本原因。他们的目标是:“将 Celery 仅仅用作一个消息队列,而在数据库中控制状态。”