1.PyTorch - DataLoader 源码解析(一)
2.å
³äºbatch å pascal
3.游戏引擎随笔 0x36:UE5.x Nanite 源码解析之可编程光栅化(下)
4.BERT源码逐行解析
5.PyTorch 源码分析(三):torch.nn.Norm类算子
6.深入理解Pytorch的产源BatchNorm操作(含部分源码)
PyTorch - DataLoader 源码解析(一)
本文为作者基于个人经验进行的初步解析,由于能力有限,源码可能存在遗漏或错误,产源敬请各位批评指正。源码
本文并未全面解析 DataLoader 的产源全部源码,仅对 DataLoader 与 Sampler 之间的源码tomcar源码联系进行了分析。以下内容均基于单线程迭代器代码展开,产源多线程情况将在后续文章中阐述。源码
以一个简单的产源数据集遍历代码为例,在循环中,源码数据是产源如何从 loader 中被取出的?通过断点调试,我们发现循环时,源码代码进入了 torch.utils.data.DataLoader 类的产源 __iter__() 方法,具体内容如下:
可以看到,源码该函数返回了一个迭代器,产源主要由 self._get_iterator() 和 self._iterator._reset(self) 提供。接下来,我们进入 self._get_iterator() 方法查看迭代器的产生过程。
在此方法中,根据 self.num_workers 的数量返回了不同的迭代器,主要区别在于多线程处理方式不同,但这两种迭代器都是继承自 _BaseDataLoaderIter 类。这里我们先看单线程下的例子,进入 _SingleProcessDataLoaderIter(self)。
构造函数并不复杂,在父类的构造器中执行了大量初始化属性,然后在自己的构造器中获得了一个 self._dataset_fetcher。此时继续单步前进断点,发现程序进入到了父类的 __next__() 方法中。
在分析代码之前,我们先整理一下目前得到的信息:
下面是 __next__() 方法的内容:
可以看到最后返回的是变量 data,而 data 是由 self._next_data() 生成的,进入这个方法,我们发现这个方法由子类负责实现。
在这个方法中,我们可以看到数据从 self._dataset_fecther.fetch() 中得到,需要依赖参数 index,而这个 index 由 self._next_index() 提供。进入这个方法可以发现它是由父类实现的。
而前面的 index 实际上是由这个 self._sampler_iter 迭代器提供的。查找 self._sampler_iter 的定义,我们发现其在构造函数中。
仔细观察,我们可以在倒数第 4 行发现 self._sampler_iter = iter(self._index_sampler),这个迭代器就是这里的 self._index_sampler 提供的,而 self._index_sampler 来自 loader._index_sampler。这个 loader 就是最外层的 DataLoader。因此我们回到 DataLoader 类中查看这个 _index_sampler 是如何得到的。
我们可以发现 _index_sampler 是一个由 @property 装饰得到的属性,会根据 self._auto_collation 来返回 self.batch_sampler 或者 self.sampler。再次整理已知信息,我们可以得到:
因此,只要知道 batch_sampler 和 sampler 如何返回 index,就能了解整个流程。
首先发现这两个属性来自 DataLoader 的构造函数,因此下面先分析构造函数。
由于构造函数代码量较大,因此这里只关注与 Sampler 相关的部分,代码如下:
在这里我们只关注以下部分:
代码首先检查了参数的合法性,然后进行了一轮初始化属性,接着判断了 dataset 的类型,处理完特殊情况。接下来,函数对参数冲突进行了判断,共判断了 3 种参数冲突:
检查完参数冲突后,函数开始创建 sampler 和 batch_sampler,如下图所示:
注意,仅当未指定 sampler 时才会创建 sampler;同理,仅在未指定 batch_sampler 且存在 batch_size 时才会创建 batch_sampler。
在 DataLoader 的构造函数中,如果不指定参数 batch_sampler,则默认创建 BatchSampler 对象。该对象需要一个 Sampler 对象作为参数参与构造。这也是在构造函数中,batch_sampler 与 sampler 冲突的原因之一。因为传入一个 batch_sampler 时,说明 sampler 已经作为参数完成了 batch_sampler 的构造,若再将 sampler 传入 DataLoader 是多余的。
以第一节中的简单代码为例,此时并未指定 Sampler 和 batch_sampler,也未指定 batch_size,默认为 1,因此在 DataLoader 构造时,创建了一个 SequencialSampler,并传入了 BatchSampler 进行构建。继续第一节中的断点,可以发现:
具体使用 sampler 还是 batch_sampler 来生成 index,取决于 _auto_collation,wepapp聊天源码而从上面的代码发现,只要存在 self.batch_sampler 就永远使用 batch_sampler 来生成。batch_sampler 与 sampler 冲突的原因之二:若不设置冲突,那么使用者试图同时指定 batch_sampler 与 sampler 后,尤其是在使用者继承了新的 Sampler 子类后, sampler 在获取数据的时候完全没有被使用,这对开发者来说是一个困惑的现象,容易引起不易察觉的 BUG。
继续断点发现程序进入了 BatchSampler 的 __iter__() 方法,代码如下:
从代码中可以发现,程序不停地从 self.sampler 中获取 idx 加入列表,直到填满一个 batch 的量,并将这一整个 batch 的 index 返回到迭代器的 _next_data()。
此处由 self._dataset_fetcher.fetch(index) 来获取真正的数据,进入函数后看到:
这里依然根据 self.auto_collation(来自 DataLoader._auto_collation)进行分别处理,但是总体逻辑都是通过 self.dataset[] 来调用 Dataset 对象的 __getitem__() 方法。
此处的 Dataset 是来自 torchvision 的 DatasetFolder 对象,这里读取文件路径中的后,经过转换变为 Tensor 对象,与标签 target 一起返回。参数中的 index 是由迭代器的 self._dataset_fetcher.fetch() 传入。
整个获取数据的流程可以用以下流程图简略表示:
注意:
另附:
对于一条循环语句,在执行过程中发生了以下事件:
å ³äºbatch å pascal
1.bat/cmdä¸è½å å¯ï¼æ²¡åæ³ç2. ä½ è¦çæ¯è¿ä¸ªï¼
procedure foo;procedure nested;
begin
// nested procedure code here
end;
begin
// foo code here
end;
游戏引擎随笔 0x:UE5.x Nanite 源码解析之可编程光栅化(下)
书接上回。
在展开正题之前,先做必要的铺垫,解释纳尼特(Nanite)技术方案中的Vertex Reuse Batch。纳尼特在软光栅路径实现机制中,将每个Cluster对应一组线程执行软光栅,每ThreadGroup有个线程。在光栅化三角形时访问三角形顶点数据,但顶点索引范围可能覆盖整个Cluster的个顶点,因此需要在光栅化前完成Cluster顶点变换。纳尼特将变换后的顶点存储于Local Shared Memory(LDS)中,进行组内线程同步,确保所有顶点变换完成,光栅化计算时直接访问LDS,实现软光栅高性能。
然而,在使用PDO(Masked)等像素可编程光栅化时,纳尼特遇到了性能问题。启用PDO或Mask时,可能需要读取Texture,根据读取的Texel决定像素光栅化深度或是否被Discard。读取纹理需计算uv坐标,而uv又需同时计算重心坐标,增加指令数量,降低寄存器使用效率,影响Active Warps数量,降低延迟隐藏能力,导致整体性能下降。复杂材质指令进一步加剧问题。
此外,当Cluster包含多种材质时,同一Cluster中的三角形被重复光栅化多次,尤其是材质仅覆盖少数三角形时,大量线程闲置,浪费GPU计算资源。
为解决这些问题,纳尼特引入基于GPU SIMT/SIMD的Vertex Reuse Batch技术。技术思路如下:将每个Material对应的三角形再次分为每个为一组的Batch,每Batch对应一组线程,每个ThreadGroup有个线程,正好对应一个GPU Warp。利用Wave指令共享所有线程中的变换后的顶点数据,无需LDS,减少寄存器数量,增加Warp占用率,提升整体性能。
Vertex Reuse Batch技术的启用条件由Shader中的NANITE_VERT_REUSE_BATCH宏控制。
预处理阶段,纳尼特在离线时构建Vertex Reuse Batch,核心逻辑在NaniteEncode.cpp中的BuildVertReuseBatches函数。通过遍历Material Range,统计唯一顶点数和三角形数,达到顶点去重和优化性能的目标。
最终,数据被写入FPackedCluster,根据材质数量选择直接或通过ClusterPageData存储Batch信息。Batch数据的Pack策略确保数据对齐和高效存储。
理解Vertex Reuse Batch后,再来回顾Rasterizer Binning的数据:RasterizerBinData和RasterizerBinHeaders。在启用Vertex Reuse Batch时,这两者包含的是Batch相关数据,Visible Index实际指的是Batch Index,而Triangle Range则对应Batch的三角形数量。
当Cluster不超过3个材质时,直接从FPackedCluster中的talend项目实例源码VertReuseBatchInfo成员读取每个材质对应的BatchCount。有了BatchCount,即可遍历所有Batch获取对应的三角形数量。在Binning阶段的ExportRasterizerBin函数中,根据启用Vertex Reuse Batch的条件调整BatchCount,表示一个Cluster对应一个Batch。
接下来,遍历所有Batch并将其对应的Cluster Index、Triangle Range依次写入到RasterizerBinData Buffer中。启用Vertex Reuse Batch时,通过DecodeVertReuseBatchInfo函数获取Batch对应的三角形数量。对于不超过3个材质的Cluster,DecodeVertReuseBatchInfo直接从Cluster的VertReuseBatchInfo中Unpack出Batch数据,否则从ClusterPageData中根据Batch Offset读取数据。
在Binning阶段的AllocateRasterizerBinCluster中,还会填充Indirect Argument Buffer,将当前Cluster的Batch Count累加,用于硬件光栅化Indirect Draw的Instance参数以及软件光栅化Indirect Dispatch的ThreadGroup参数。这标志着接下来的光栅化Pass中,每个Instance和ThreadGroup对应一个Batch,以Batch为光栅化基本单位。
终于来到了正题:光栅化。本文主要解析启用Vertex Reuse Batch时的软光栅源码,硬件光栅化与之差异不大,此处略过。此外,本文重点解析启用Vertex Reuse Batch时的光栅化源码,对于未启用部分,除可编程光栅化外,与原有固定光栅化版本差异不大,不再详细解释。
CPU端针对硬/软光栅路径的Pass,分别遍历所有Raster Bin进行Indirect Draw/Dispatch。由于Binning阶段GPU中已准备好Draw/Dispatch参数,因此在Indirect Draw/Dispatch时只需设置每个Raster Bin对应的Argument Offset即可。
由于可编程光栅化与材质耦合,导致每个Raster Bin对应的Shader不同,因此每个Raster Bin都需要设置各自的PSO。对于不使用可编程光栅化的Nanite Cluster,即固定光栅化,为不降低原有性能,在Shader中通过两个宏隔绝可编程和固定光栅化的执行路径。
此外,Shader中还包括NANITE_VERT_REUSE_BATCH宏,实现软/硬光栅路径、Compute Pipeline、Graphics Pipeline、Mesh Shader、Primitive Shader与材质结合生成对应的Permutation。这部分代码冗长繁琐,不再详细列出讲解,建议自行阅读源码。
GPU端软光栅入口函数依旧是MicropolyRasterize,线程组数量则根据是否启用Vertex Reuse Batch决定。
首先判断是否使用Rasterizer Binning渲染标记,启用时根据VisibleIndex从Binning阶段生成的RasterizerBinHeaders和RasterizerBinData Buffer中获取对应的Cluster Index和光栅化三角形的起始范围。当启用Vertex Reuse Batch,这个范围是Batch而非Cluster对应的范围。
在软光栅中,每线程计算任务分为三步。第一步利用Wave指令共享所有线程中的Vertex Attribute,线程数设置为Warp的Size,目前为,每个Lane变换一个顶点,最多变换个顶点。由于三角形往往共用顶点,直接根据LaneID访问顶点可能重复,为确保每个Warp中的每个Lane处理唯一的顶点,需要去重并返回当前Lane需要处理的唯一顶点索引,通过DeduplicateVertIndexes函数实现。同时返回当前Lane对应的三角形顶点索引,用于三角形设置和光栅化步骤。
获得唯一顶点索引后,进行三角形设置。这里代码与之前基本一致,只是写成模板函数,将Sub Pixel放大倍数SubpixelSamples和是否背面剔除bBackFaceCull作为模板参数,通过使用HLSL 语法实现。
最后是光栅化三角形写入像素。在Virtual Shadow Map等支持Nanite的场景下,定义模板结构TNaniteWritePixel来实现不同应用环境下Nanite光栅化Pipeline的细微差异。
在ENABLE_EARLY_Z_TEST宏定义时,调用EarlyDepthTest函数提前剔除像素,减少后续重心坐标计算开销。当启用NANITE_PIXEL_PROGRAMMABLE宏时,可以使用此机制提前剔除像素。
最后重点解析前面提到的DeduplicateVertIndexes函数。
DeduplicateVertIndexes函数给每个Lane返回唯一的会说话tomjava源码顶点索引,同时给当前Lane分配三角形顶点索引以及去重后的顶点数量。
首先通过DecodeTriangleIndices获取Cluster Local的三角形顶点索引,启用Cluster约束时获取所有Lane中最小的顶点索引,即顶点基索引。将当前三角形顶点索引(Cluster Local)减去顶点基索引,得到相对顶点基索引的局部顶点索引。
接下来生成顶点标志位集合。遍历三角形三个顶点,将局部顶点索引按顺序设置到对应位,表示哪些顶点已被使用。每个标志位是顶点的索引,并在已使用的顶点位置处设置为1。使用uint2数据类型,最多表示个顶点位。
考虑Cluster最多有个顶点,为何使用位uint2来保存Vertex Mask而非位?这是由于Nanite在Build时启用了约束机制(宏NANITE_USE_CONSTRAINED_CLUSTERS),该机制保证了Cluster中的三角形顶点索引与当前最大值之差必然小于(宏CONSTRAINED_CLUSTER_CACHE_SIZE),因此,生成的Triangle Batch第一个索引与当前最大值之差将不小于,并且每个Batch最多有个唯一顶点,顶点索引差的最大值为,仅需2个位数据即可。约束机制确保使用更少数据和计算。
将所有Lane所标记三个顶点的Vertex Mask进行位合并,得到当前Wave所有顶点位掩码。通过FindNthSetBit函数找出当前Lane对应的Mask索引,加上顶点基索引得到当前Lane对应的Cluster Local顶点索引。
接下来获取当前Lane对应的三角形的Wave Local的三个顶点索引,用于后续通过Wave指令访问其他Lane中已经计算完成的顶点属性。通过MaskedBitCount函数根据Vertex Mask以及前面局部顶点索引通过前缀求和得到当前Lane对应的Vertex Wave Local Index。
最后统计Vertex Mask所有位,返回总计有效的顶点数量。
注意FindNthSetBit函数,实现Lane与顶点局部索引(减去顶点基索引)的映射,返回当前Lane对应的Vertex Mask中被设置为1的位索引。如果某位为0,则返回下一个位为1的索引。如果Mask中全部位都设置为1,则实际返回为Lane索引。通过二分法逐渐缩小寻找索引范围,不断更新所在位置,最后返回找到的位置索引。
最后,出于验证目的进行了Vertex Reuse Batch的性能测试。在材质包含WPO、PDO或Mask时关闭Vertex Reuse Batch功能,与开启功能做对比。测试场景为由每颗万个三角形的树木组成的森林,使用Nsight Graphics进行Profiling,得到GPU统计数据如下:
启用Vertex Reuse Batch后,软光栅总计耗时减少了1.毫秒。SM Warp总占用率有一定提升。SM内部工作量分布更加均匀,SM Launch的总Warp数量提升了一倍。长短板Stall略有增加,但由于完全消除了由于LDS同步导致的Barrier Stall,总体性能还是有很大幅度的提升。
至此,Nanite可编程光栅化源码解析讲解完毕。回顾整个解析过程,可以发现UE5团队并未使用什么高深的黑科技,而是依靠引擎开发者强悍的工程实现能力完成的,尤其是在充分利用GPU SIMT/SIMD机制榨干机能的同时,保证了功能与极限性能的实现。这种能力和精神,都很值得我们学习。
BERT源码逐行解析
解析BERT源码,关键在于理解Tensor的形状,这些我在注释中都做了标注,以来自huggingface的PyTorch版本为例。首先,BertConfig中的参数,如bert-base-uncased,包含了word_embedding、position_embedding和token_type_embedding三部分,它们合成为BertEmbedding,形状为[batch_size, seq_len, hidden_size],如( x x )。
Bert的基石是Multi-head-self-attention,这部分是理解BERT的核心。代码中对相对距离编码有详细注释,通过计算左右端点位置,形成一个[seq_len, seq_len]的相对位置矩阵。接着是BertSelfOutput,执行add和norm操作。
BertAttention则将Self-Attention和Self-Output结合起来。BertIntermediate部分,倚天剑源码对应BERT模型中的一个FFN(前馈神经网络)部分,而BertOutput则相当直接。最后,BertLayer就是将这些组件组装成一个完整的层,BERT模型就是由多个这样的层叠加而成的。
PyTorch 源码分析(三):torch.nn.Norm类算子
PyTorch源码详解(三):torch.nn.Norm类算子深入解析
Norm类算子在PyTorch中扮演着关键角色,它们包括BN(BatchNorm)、LayerNorm和InstanceNorm。1. BN/LayerNorm/InstanceNorm详解
BatchNorm(BN)的核心功能是对每个通道(C通道)的数据进行标准化,确保数据在每个批次后保持一致的尺度。它通过学习得到的gamma和beta参数进行缩放和平移,保持输入和输出形状一致,同时让数据分布更加稳定。 gamma和beta作为动态调整权重的参数,它们在BN的学习过程中起到至关重要的作用。2. Norm算子源码分析
继承关系:Norm类在PyTorch中具有清晰的继承结构,子类如BatchNorm和InstanceNorm分别继承了其特有的功能。
BN与InstanceNorm实现:在Python代码中,BatchNorm和InstanceNorm的实例化和计算逻辑都包含对输入数据的2D转换,即将其分割为M*N的矩阵。
计算过程:在计算过程中,首先计算每个通道的均值和方差,这是这些标准化方法的基础步骤。
C++侧的源码洞察
C++实现中,对于BatchNorm和LayerNorm,代码着重于处理数据的标准化操作,同时确保线程安全,通过高效的数据视图和线程视图处理来提高性能。深入理解Pytorch的BatchNorm操作(含部分源码)
Pytorch中的BatchNorm操作在训练和测试模式下有所不同,特别是在涉及dropout时。Batch Normalization(BN)是深度学习中的重要技术,通过在神经网络中间层对输入数据进行标准化处理,解决协方差偏移问题。其核心公式包含对每个通道数据的均值和方差计算,规范化操作后进行仿射变换以保持模型性能。
在BN中,需要关注的参数主要包括学习参数gamma和beta,以及动态统计的running_mean和running_var。在Pytorch的实现中,如nn.BatchNorm2d API,关键参数包括trainning(模型是否在训练模式)、affine(是否启用仿射变换)、track_running_stats(是否跟踪动态统计)和momentum(动态统计更新的权重)。
训练状态会影响BN层的计算,当模型处于训练状态(trainning=True)时,running_mean和running_var会在每次前向传播(forward())中更新,而转为测试模式(mode.eval())则会冻结这些统计值。源码中的_NormBase类和_BatchNorm类定义了这些操作的细节,包括动态统计的管理。
对于自定义BN,可以重载前向传播函数,改变规范化操作的细节。总的来说,理解Pytorch的BatchNorm操作,需关注其在训练和测试模式中的行为,以及与模型训练状态相关的关键参数。
一文搞懂大数据批量处理框架Spring Batch的完美解析方案是什么。
如今微服务架构讨论的如火如荼。但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易。在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理。针对OLTP,业界有大量的开源框架、优秀的架构设计给予支撑;但批处理领域的框架确凤毛麟角。是时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以SpringBatch为例,和大家一起探秘批处理的世界。
初识批处理典型场景探秘领域模型及关键架构实现作业健壮性与扩展性批处理框架的不足与增强批处理典型业务场景
对账是典型的批处理业务处理场景,各个金融机构的往来业务和跨主机系统的业务都会涉及到对账的过程,如大小额支付、银联交易、人行往来、现金管理、POS业务、ATM业务、证券公司资金账户、证券公司与证券结算公司。
下面是某行网银的部分日终跑批实例场景需求。
涉及到的需求点包括:
批量的每个单元都需要错误处理和回退;每个单元在不同平台中运行;需要有分支选择;每个单元需要监控和获取单元处理日志;提供多种触发规则,按日期,日历,周期触发;
除此之外典型的批处理适用于如下的业务场景:
定期提交批处理任务(日终处理)并行批处理:并行处理任务企业消息驱动处理大规模的并行处理手动或定时重启按顺序处理依赖的任务(可扩展为工作流驱动的批处理)部分处理:忽略记录(例如在回滚时)完整的批处理事务
与OLTP类型交易不同,批处理作业两个典型特征是批量执行与自动执行(需要无人值守):前者能够处理大批量数据的导入、导出和业务逻辑计算;后者无需人工干预,能够自动化执行批量任务。
在关注其基本功能之外,还需要关注如下的几点:
健壮性:不会因为无效数据或错误数据导致程序崩溃;可靠性:通过跟踪、监控、日志及相关的处理策略(重试、跳过、重启)实现批作业的可靠执行;扩展性:通过并发或者并行技术实现应用的纵向和横向扩展,满足海量数据处理的性能需求;
苦于业界真的缺少比较好的批处理框架,SpringBatch是业界目前为数不多的优秀批处理框架(Java语言开发),SpringSource和Accenture(埃森哲)共同贡献了智慧。
Accenture在批处理架构上有着丰富的工业级别的经验,贡献了之前专用的批处理体系框架(这些框架历经数十年研发和使用,为SpringBatch提供了大量的参考经验)。
SpringSource则有着深刻的技术认知和Spring框架编程模型,同时借鉴了JCL(JobControlLanguage)和COBOL的语言特性。年JSR-将批处理纳入规范体系,并被包含在了JEE7之中。这意味着,所有的JEE7应用服务器都会有批处理的能力,目前第一个实现此规范的应用服务器是Glassfish4。当然也可以在JavaSE中使用。
但最为关键的一点是:JSR-规范大量借鉴了SpringBatch框架的设计思路,从上图中的核心模型和概念中可以看出究竟,核心的概念模型完全一致。
通过SpringBatch框架可以构建出轻量级的健壮的并行处理应用,支持事务、并发、流程、监控、纵向和横向扩展,提供统一的接口管理和任务管理。
框架提供了诸如以下的核心能力,让大家更关注在业务处理上。更是提供了如下的丰富能力:
明确分离批处理的执行环境和应用将通用核心的服务以接口形式提供提供“开箱即用”的简单的默认的核心执行接口提供Spring框架中配置、自定义、和扩展服务所有默认实现的核心服务能够容易的被扩展与替换,不会影响基础层提供一个简单的部署模式,使用Maven进行编译批处理关键领域模型及关键架构
先来个HelloWorld示例,一个典型的批处理作业。
典型的一个作业分为3部分:作业读、作业处理、作业写,也是典型的三步式架构。整个批处理框架基本上围绕Read、Process、Writer来处理。除此之外,框架提供了作业调度器、作业仓库(用以存放Job的元数据信息,支持内存、DB两种模式)。
完整的领域概念模型参加下图:
JobLauncher(作业调度器)是SpringBatch框架基础设施层提供的运行Job的能力。通过给定的Job名称和作JobParameters,可以通过JobLauncher执行Job。
通过JobLauncher可以在Java程序中调用批处理任务,也可以在通过命令行或者其它框架(如定时调度框架Quartz)中调用批处理任务。
JobRepository来存储Job执行期的元数据(这里的元数据是指JobInstance、JobExecution、JobParameters、StepExecution、ExecutionContext等数据),并提供两种默认实现。
一种是存放在内存中;另一种将元数据存放在数据库中。通过将元数据存放在数据库中,可以随时监控批处理Job的执行状态。Job执行结果是成功还是失败,并且使得在Job失败的情况下重新启动Job成为可能。Step表示作业中的一个完整步骤,一个Job可以有一个或者多个Step组成。
批处理框架运行期的模型也非常简单:
JobInstance(作业实例)是一个运行期的概念,Job每执行一次都会涉及到一个JobInstance。
JobInstance来源可能有两种:一种是根据设置的JobParameters从JobRepository(作业仓库)中获取一个;如果根据JobParameters从JobRepository没有获取JobInstance,则新创建一个新的JobInstance。
JobExecution表示Job执行的句柄,一次Job的执行可能成功也可能失败。只有Job执行成功后,对应的JobInstance才会被完成。因此在Job执行失败的情况下,会有一个JobInstance对应多个JobExecution的场景发生。
总结下批处理的典型概念模型,其设计非常精简的十个概念,完整支撑了整个框架。
Job提供的核心能力包括作业的抽象与继承,类似面向对象中的概念。对于执行异常的作业,提供重启的能力。
框架在Job层面,同样提供了作业编排的概念,包括顺序、条件、并行作业编排。
在一个Job中配置多个Step。不同的Step间可以顺序执行,也可以按照不同的条件有选择的执行(条件通常使用Step的退出状态决定),通过next元素或者decision元素来定义跳转规则;
为了提高多个Step的执行效率,框架提供了Step并行执行的能力(使用split进行声明,通常该情况下需要Step之间没有任何的依赖关系,否则容易引起业务上的错误)。Step包含了一个实际运行的批处理任务中的所有必需的信息,其实现可以是非常简单的业务实现,也可以是非常复杂的业务处理,Step的复杂程度通常是业务决定的。
每个Step由ItemReader、ItemProcessor、ItemWriter组成,当然根据不同的业务需求,ItemProcessor可以做适当的精简。同时框架提供了大量的ItemReader、ItemWriter的实现,提供了对FlatFile、XML、Json、DataBase、Message等多种数据类型的支持。
框架还为Step提供了重启、事务、重启次数、并发数;以及提交间隔、异常跳过、重试、完成策略等能力。基于Step的灵活配置,可以完成常见的业务功能需求。其中三步走(Read、Processor、Writer)是批处理中的经典抽象。
作为面向批的处理,在Step层提供了多次读、处理,一次提交的能力。
在Chunk的操作中,可以通过属性commit-interval设置read多少条记录后进行一次提交。通过设置commit-interval的间隔值,减少提交频次,降低资源使用率。Step的每一次提交作为一个完整的事务存在。默认采用Spring提供的声明式事务管理模式,事务编排非常方便。如下是一个声明事务的示例:
框架对于事务的支持能力包括:
Chunk支持事务管理,通过commit-interval设置每次提交的记录数;支持对每个Tasklet设置细粒度的事务配置:隔离界别、传播行为、超时;支持rollback和norollback,通过skippable-exception-classes和no-rollback-exception-classes进行支撑;支持JMSQueue的事务级别配置;
另外,在框架资深的模型抽象方面,SpringBatch也做了极为精简的抽象。
仅仅使用六张业务表存储了所有的元数据信息(包括Job、Step的实例,上下文,执行器信息,为后续的监控、重启、重试、状态恢复等提供了可能)。
BATCH_JOB_INSTANCE:作业实例表,用于存放Job的实例信息BATCH_JOB_EXECUTION_PARAMS:作业参数表,用于存放每个Job执行时候的参数信息,该参数实际对应Job实例的。BATCH_JOB_EXECUTION:作业执行器表,用于存放当前作业的执行信息,比如创建时间,执行开始时间,执行结束时间,执行的那个Job实例,执行状态等。BATCH_JOB_EXECUTION_CONTEXT:作业执行上下文表,用于存放作业执行器上下文的信息。BATCH_STEP_EXECUTION:作业步执行器表,用于存放每个Step执行器的信息,比如作业步开始执行时间,执行完成时间,执行状态,读写次数,跳过次数等信息。BATCH_STEP_EXECUTION_CONTEXT:作业步执行上下文表,用于存放每个作业步上下文的信息。实现作业的健壮性与扩展性
批处理要求Job必须有较强的健壮性,通常Job是批量处理数据、无人值守的,这要求在Job执行期间能够应对各种发生的异常、错误,并对Job执行进行有效的跟踪。
一个健壮的Job通常需要具备如下的几个特性:
1.容错性
在Job执行期间非致命的异常,Job执行框架应能够进行有效的容错处理,而不是让整个Job执行失败;通常只有致命的、导致业务不正确的异常才可以终止Job的执行。
2.可追踪性
Job执行期间任何发生错误的地方都需要进行有效的记录,方便后期对错误点进行有效的处理。例如在Job执行期间任何被忽略处理的记录行需要被有效的记录下来,应用程序维护人员可以针对被忽略的记录后续做有效的处理。
3.可重启性
Job执行期间如果因为异常导致失败,应该能够在失败的点重新启动Job;而不是从头开始重新执行Job。
框架提供了支持上面所有能力的特性,包括Skip(跳过记录处理)、Retry(重试给定的操作)、Restart(从错误点开始重新启动失败的Job):
Skip,在对数据处理期间,如果数据的某几条的格式不能满足要求,可以通过Skip跳过该行记录的处理,让Processor能够顺利的处理其余的记录行。Retry,将给定的操作进行多次重试,在某些情况下操作因为短暂的异常导致执行失败,如网络连接异常、并发处理异常等,可以通过重试的方式避免单次的失败,下次执行操作时候网络恢复正常,不再有并发的异常,这样通过重试的能力可以有效的避免这类短暂的异常。Restart,在Job执行失败后,可以通过重启功能来继续完成Job的执行。在重启时候,批处理框架允许在上次执行失败的点重新启动Job,而不是从头开始执行,这样可以大幅提高Job执行的效率。
对于扩展性,框架提供的扩展能力包括如下的四种模式:
MultithreadedStep多线程执行一个Step;ParallelStep通过多线程并行执行多个Step;RemoteChunking在远端节点上执行分布式Chunk操作;PartitioningStep对数据进行分区,并分开执行;
我们先来看第一种的实现MultithreadedStep:
批处理框架在Job执行时默认使用单个线程完成任务的执行,同时框架提供了线程池的支持(MultithreadedStep模式),可以在Step执行时候进行并行处理,这里的并行是指同一个Step使用线程池进行执行,同一个Step被并行的执行。使用tasklet的属性task-executor可以非常容易的将普通的Step变成多线程Step。
MultithreadedStep的实现示例:
需要注意的是SpringBatch框架提供的大部分的ItemReader、ItemWriter等操作都是线程不安全的。
可以通过扩展的方式显现线程安全的Step。
下面为大家展示一个扩展的实现:
需求:针对数据表的批量处理,实现线程安全的Step,并且支持重启能力,即在执行失败点可以记录批处理的状态。
对于示例中的数据库读取组件JdbcCursorItemReader,在设计数据库表时,在表中增加一个字段Flag,用于标识当前的记录是否已经读取并处理成功,如果处理成功则标识Flag=true,等下次重新读取的时候,对于已经成功读取且处理成功的记录直接跳过处理。
MultithreadedStep(多线程步)提供了多个线程执行一个Step的能力,但这种场景在实际的业务中使用的并不是非常多。
更多的业务场景是Job中不同的Step没有明确的先后顺序,可以在执行期并行的执行。
ParallelStep:提供单个节点横向扩展的能力
使用场景:StepA、StepB两个作业步由不同的线程执行,两者均执行完毕后,StepC才会被执行。
框架提供了并行Step的能力。可以通过Split元素来定义并行的作业流,并制定使用的线程池。
ParallelStep模式的执行效果如下:
每个作业步并行处理不同的记录,示例中三个作业步,处理同一张表中的不同数据。
并行Step提供了在一个节点上横向处理,但随着作业处理量的增加,有可能一台节点无法满足Job的处理,此时我们可以采用远程Step的方式将多个机器节点组合起来完成一个Job的处理。
RemoteChunking:远程Step技术本质上是将对Item读、写的处理逻辑进行分离;通常情况下读的逻辑放在一个节点进行操作,将写操作分发到另外的节点执行。
远程分块是一个把step进行技术分割的工作,不需要对处理数据的结构有明确了解。
任何输入源能够使用单进程读取并在动态分割后作为块发送给远程的工作进程。
远程进程实现了监听者模式,反馈请求、处理数据最终将处理结果异步返回。请求和返回之间的传输会被确保在发送者和单个消费者之间。
在Master节点,作业步负责读取数据,并将读取的数据通过远程技术发送到指定的远端节点上,进行处理,处理完毕后Master负责回收Remote端执行的情况。
在SpringBatch框架中通过两个核心的接口来完成远程Step的任务,分别是ChunkProvider与ChunkProcessor。
ChunkProvider:根据给定的ItemReader操作产生批量的Chunk操作;
ChunkProcessor:负责获取ChunkProvider产生的Chunk操作,执行具体的写逻辑;
SpringBatch中对远程Step没有默认的实现,但我们可以借助SI或者AMQP实现来实现远程通讯能力。
Step本地节点负责读取数据,并通过MessagingGateway将请求发送到远程Step上;远程Step提供了队列的监听器,当请求队列中有消息时候获取请求信息并交给ChunkHander负责处理。
接下来我们看下最后一种分区模式;PartitioningStep:分区模式需要对数据的结构有一定的了解,如主键的范围、待处理的文件的名字等。
这种模式的优点在于分区中每一个元素的处理器都能够像一个普通SpringBatch任务的单步一样运行,也不必去实现任何特殊的或是新的模式,来让他们能够更容易配置与测试。
通过分区可以实现以下的优点:
分区实现了更细粒度的扩展;基于分区可以实现高性能的数据切分;分区比远程通常具有更高的扩展性;分区后的处理逻辑,支持本地与远程两种模式;分区作业典型的可以分成两个处理阶段,数据分区、分区处理;
数据分区:根据特殊的规则(例如:根据文件名称,数据的唯一性标识,或者哈希算法)将数据进行合理的数据切片,为不同的切片生成数据执行上下文ExecutionContext、作业步执行器StepExecution。可以通过接口Partitioner生成自定义的分区逻辑,SpringBatch批处理框架默认实现了对多文件的实现org.springframework.batch.core.partition.support.MultiResourcePartitioner;也可以自行扩展接口Partitioner来实现自定义的分区逻辑。
分区处理:通过数据分区后,不同的数据已经被分配到不同的作业步执行器中,接下来需要交给分区处理器进行作业,分区处理器可以本地执行也可以远程执行被划分的作业。接口PartitionHandler定义了分区处理的逻辑,SpringBatch批处理框架默认实现了本地多线程的分区处理org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;也可以自行扩展接口PartitionHandler来实现自定义的分区处理逻辑。
SpringBatch框架提供了对文件分区的支持,实现类org.springframework.batch.core.partition.support.MultiResourcePartitioner提供了对文件分区的默认支持,根据文件名将不同的文件处理进行分区,提升处理的速度和效率,适合有大量小文件需要处理的场景。
示例展示了将不同文件分配到不同的作业步中,使用MultiResourcePartitioner进行分区,意味着每个文件会被分配到一个不同的分区中。如果有其它的分区规则,可以通过实现接口Partitioner来进行自定义的扩展。有兴趣的TX,可以自己实现基于数据库的分区能力哦。
总结一下,批处理框架在扩展性上提供了4中不同能力,每种都是各自的使用场景,我们可以根据实际的业务需要进行选择。
批处理框架的不足与增强
SpringBatch批处理框架虽然提供了4种不同的监控方式,但从目前的使用情况来看,都不是非常的友好。
通过DB直接查看,对于管理人员来讲,真的不忍直视;通过API实现自定义的查询,这是程序员的天堂,确实运维人员的地狱;提供了Web控制台,进行Job的监控和操作,目前提供的功能太,无法直接用于生产;提供JMX查询方式,对于非开发人员太不友好;
但在企业级应用中面对批量数据处理,仅仅提供批处理框架仅能满足批处理作业的快速开发、执行能力。
企业需要统一的批处理平台来处理复杂的企业批处理应用,批处理平台需要解决作业的统一调度、批处理作业的集中管理和管控、批处理作业的统一监控等能力。
那完美的解决方案是什么呢?
关注我:转发私信回复“架构资料”获取Java高级架构资料、源码、笔记、视频
Dubbo、Redis、设计模式、Netty、zookeeper、Springcloud、分布式、微服务
高并发等架构技术
企业级批处理平台需要在SpringBatch批处理框架的基础上,集成调度框架,通过调度框架可以将任务按照企业的需求进行任务的定期执行;
丰富目前SpringBatchAdmin(SpringBatch的管理监控平台,目前能力比较薄弱)框架,提供对Job的统一管理功能,增强Job作业的监控、预警等能力;
通过与企业的组织机构、权限管理、认证系统进行合理的集成,增强平台对Job作业的权限控制、安全管理能力。
由于时间关系,今天的分享就到这里,很多内容未能展开讨论。欢迎大家在实际业务中使用SpringBatch框架。
最后的话
觉得还不错可以转发关注支持一波~私信架构资料获取一些我私人整理的Java进阶资料!
为什么某些人会一直比你优秀,是因为他本身就很优秀还一直在持续努力变得更优秀。而你是不是还在满足于现状且内心在窃喜?“对于程序员来说,如果哪一天开始他停止了学习,那么他的职业生涯便开始宣告消亡。”所以行动起来,学习起来!
FREE SOLO - 自己动手实现Raft - - leveldb源码分析与调试-2
继续探讨leveldb的内部操作,首先解析写入过程。write-batch和leveldb key是核心数据结构,它们在数据写入中的角色至关重要。
1. 数据写入流程:当通过DBImpl::Put或DB::Put添加键值对时,数据会被封装成write-batch。这个batch随后交给DBImpl::Write,最终由log::Writer::AddRecord负责将数据写入log。这样,数据便有了持久化的记录。
2. 写入memtable:写入log后,数据还会被添加到memtable,便于快速查询。同样,DBImpl::Write通过MemTableInserter::Put调用MemTable::Add,将数据写入memtable,形成内存中的临时存储。
3. 数据读取:对于查询,DBImpl::Get是起点,通过MemTable::Get调用SkipList::FindGreaterOrEqual在SortedTable的SkipList中搜索,提供即时的数据访问。
总结:通过上述调用栈,我们可以对leveldb的写入和读取有更深入的理解。在后续的内容中,我们将关注大量数据写入对内存和磁盘影响的详细分析。
期待在下次与您分享更多内容,再见!
联系信息:email: castermode@gmail.com | 网站:vectordb.io | 项目未指定
BatchNorm理解(含Pytorch部分源码)
深度学习中,数据归一化是关键。神经网络学习数据分布以在测试集上达到泛化效果。然而,若每个batch输入数据分布不同,即Covariate Shift,这会带来训练挑战。数据经过多层网络后,分布发生改变,形成Internal Covariate Shift,这进一步增加了下层网络学习的难度。为解决中间层Internal Covariate Shift问题,引入了Batch Normalization(BN)操作。
BN算法流程如下:
(1)计算输入批量数据的均值。
(2)计算输入批量数据的方差。
(3)对每个数据进行归一化。
(4)引入缩放变量和平移变量,通过训练更新,计算归一化后的值。
BN中均值方差计算基于张量数据,通常维度为[N, H, W, C]。其中N为batch_size,H和W为特征图尺寸,C为通道数。均值计算是每个通道内数字总和除以[N, H, W]。例如,对于[2,2,2,3]输入,代表2个batch,每个batch有3个特征图(通道数为3),每个特征图大小为2*2。以通道1为例,计算步骤如下:
均值计算公式为:均值=(所有数字总和)/ [N, H, W]。
最终获得三个通道的均值和方差,网络更新参数,为每一个channel对应一个缩放变量和平移变量。
在Pytorch中,BN通过_NormBase类和_BatchNorm类实现。_NormBase类定义BN相关的属性,_BatchNorm类继承自_NormBase,是BatchNorm2d实际调用的类。具体源码包括定义属性、计算均值和方差、归一化以及参数更新等关键步骤。