皮皮网
皮皮网

【同城分销源码】【r语言源码官网】【易语言直播伴侣源码】flumesource源码修改

来源:51A源码 发表时间:2025-01-11 21:33:38

1.Flume的码修Source,Sink,码修Channel的码修作用?你们Source是什么类型?
2.flume 自定义 hbase sink
3.Flume常用Source、Channel、码修sink组件类型选型
4.Flume面试题
5.flume中的agernt包含了哪三个组件
6.flume工作进程由什么组件构成

flumesource源码修改

Flume的码修Source,Sink,码修同城分销源码Channel的码修作用?你们Source是什么类型?

       理解Flume的架构和性能优化至关重要。Flume设计原理确保数据不丢失,码修但可能引起重复,码修这取决于Sink响应情况。码修

       优化Source,码修通过增加个数或配置多个FileGroups,码修可提升数据读取能力。码修batchSize参数调整有助于提高数据传输效率。码修

       Channel选择影响性能与容错性。码修memory类型性能最佳,但易丢失数据;file类型容错性更强,配置多个不同盘目录可优化性能。注意,Channel容量和事务容量需与Source和Sink的batchSize参数相协调。

       Sink优化关注增加个数以提升消费能力,r语言源码官网避免过度配置导致资源浪费。适当调整batchSize参数可优化数据处理速度。

flume 自定义 hbase sink

        业务需求 flume需要从kafka获取数据并写入hbase

        开始写的想法:按照flume的流程:一个source ,三个channel, 三个sink,因为我需要三个列族,如果使用官方的hbase sink那么需要三个sink。而且需要自定义一个source的拦截器,根据kafka获取的数据匹配不不同的channel,三个channel对应三个列族,然后配置到sink,就可以使用官方hbase的sink插入数据了。

        实现:

1. 自定义一个拦截器

        自定义拦截器

        将自定义拦截器打成jar包,放到flume的lib目录,有依赖的包也需要将jar包一并放入,不然会报找不到包异常

        conf/flume-diysource.conf 配置信息

        一切准备就绪flume启动命令

        控制台打印信息没报什么错误

        查看hbase,hbase的列族是分对了,但是他把整一个kafka读取的数据当做一个value写入一列,而且列名是默认的,并不是我想要的。

于是..............diy开始了

        当我在看flume的时候看到关于kafka channel是 这样写的

        思路:根据 3 我们可以不需要再写一个source,直接channel 到 sink 一撸到底,只需要在sink上进行habse的相关操作

        直接自定义 sink

        依赖信息

        自定义类 MyHbaseSink

        自定义完毕,开始配置文件 ,这个配置就比较简单 conf/flume-diysource.conf 文件

        一切准备好了,将自定义的sink打成jar包,放到flume,直接运行

        后台运行

        完美运行,habse在哗啦啦的写入!

        总结: 业务需求是将kafka的数据写入到hbase,开始是想用官方的sink,结果是我太天真了,官方的hbase sink的rowKey并不满足业务需求,而且kafka的数据字段是不确定的,搞了半天,白忙活。发现自己定义的比较符合业务需求。

        但是,自定义的sink也是比较坑的,开始自定sink,我把处理event的逻辑全都放在process() 方法里,结果很抽风,根本就没有执行到我的逻辑里,然后我在官网process()方法的上看到这么一句话: Send the Event to the external repository.

大概意思是让我把event发送到外部库,于是我把event处理独立出process()。结果amazing,成功写入hbase了,果然运气不错!

Flume常用Source、Channel、sink组件类型选型

       在Flume中,常用组件的选型与应用对数据收集与传输至关重要。以下将分别介绍Source、Channel、sink组件的常用类型与使用方法。

       Source组件

       1. netcat 类型:用于监听指定端口,收集端口数据,适用于实时监控与数据接收。

       例如:检测端口是否被占用,使用命令 `netstat -nlp | grep 端口号` 打印到控制台。

       2. Exec 类型:可以将命令的输出作为数据源,适用于监控命令行输出结果。

       3. spooldir 类型:监控目录下的文件,实时读取目录文件到HDFS,适用于实时追踪文件变化。

       4. taildir 类型:监控文件内容,易语言直播伴侣源码适合于监听实时追加的文件。与Spooldir相比,Taildir支持断点续传。

       5. Kafka:支持从Kafka主题中读取数据,适用于大规模数据流处理。支持多种版本,最新测试支持到2.0.1。

       6. Avro:结合Avro sink使用,用于数据序列化与传输,适用于复杂数据结构的处理。

       Channel组件

       1. Memory:基于内存存储事件,传输速度快,适用于数据量较小或允许数据丢失的场景。

       例如:在监控文件变动的场景中,Memory Channel用于实时传输数据。

       2. File:事件保存在本地文件中,数据恢复性高,但传输速度相对较慢。

       3. JDBC:事件保存在关系型数据库中,适用于需要持久化存储的c gui qt 4源码数据。

       sink组件

       1. HDFS:将事件写入Hadoop分布式文件系统,支持文本和序列文件,适用于大数据存储与处理。

       例如:实时监控文件变动,数据被直接写入HDFS。

       2. Avro:用于多Agent级联场景,如两个Agent串联或多个Agent多路复用数据传输。

       3. Hive:将事件直接传输到Hive表或分区,适用于实时查询与数据处理。

       4. Logger:用于测试或日志输出,提供事件记录。

       5. FailoverSinkProcessor:实现故障转移功能,确保数据传输的可靠性。

       在实际应用中,选择组件时需考虑数据量、实时性、持久化与可靠性等因素。例如,使用Memory Channel与Avro sink在数据量较小且允许数据丢失的场景下,实现高效的php图书商城源码数据收集与传输。

Flume面试题

       Flume架构原理确保数据不会丢失,内部有完善的事务机制。数据从Source到Channel,以及从Channel到Sink均是事务性的,因此在正常运行时不会出现数据丢失情况。唯一可能丢失数据的是当使用memoryChannel时,若agent宕机导致数据丢失,或是Channel存储满导致未写入数据丢失。

       Flume和Kafka在数据采集层各有优势。Flume是一个管道流方式的工具,提供了丰富的默认实现和扩展API,主要用于往HDFS或HBase发送数据。Kafka则是一个分布式的消息队列,具有通用性,支持多个生产者和消费者共享多个主题。Kafka在多个系统间共享数据时更优,而Flume专为Hadoop设计,内置多种source和sink组件,支持实时数据处理和拦截器。Flume在数据流处理上表现良好,Kafka则需要配合流处理系统使用。如果数据最终用于Hadoop,则Flume更为合适,但Kafka也支持与Flume结合使用。

       Flume与Kafka的结合使用可以实现数据的高可用性。Kafka提供容错机制,确保零数据丢失,但不支持副本事件。Flume的宕机数据丢失问题可以通过集群或主备模式解决。Flume采集日志通过流式直接传输到存储层,而Kafka则缓存数据在集群中,后端采集存储。若Flume采集中断,可以采用文件系统记录日志,而Kafka则使用offset记录。

       Flume组件包括source、channel和sink。source负责采集数据,将数据流传输到channel;channel作为桥梁,类似于队列,连接source和sink;sink从channel收集数据,并将数据写入目标源,如HDFS、HBase等。使用Flume的主要原因在于其高效的数据采集能力,支持多种数据源,如web服务器日志等。

       Flume组成架构包括source、channel和sink,以及内部事务机制。source消耗外部数据源的事件,channel作为数据缓冲区连接source和sink,sink则持续轮询channel中的事件并批量写入存储或索引系统。Flume自带内存和文件channel,其中内存channel不适用于关注数据丢失的场景。若需要关心数据完整性,应使用文件channel。其他channel如JDBC通道等也存在。sink组件目标包括HDFS、logger、avro、thrift等,实现数据的最终存储或发送。

       Flume的事务机制与数据库类似,确保数据流的完整性和一致性。事务机制在source到channel及channel到sink的事件传递过程中分别启动,确保数据的正确处理和存储。spooling directory source会为文件的每一行创建事件,确保事务中所有事件的完整传递。事务处理流程包括数据的创建、提交或回滚,以确保数据的一致性和完整性。所有的事件都会保持在channel中,以便在发生异常时进行重试或回滚操作。

flume中的agernt包含了哪三个组件

Agent中包含了三个重要的组件,Source,Channel,Sink。

       Source是从其他生产数据的应用中接受数据的组件。Source可以监听一个或者多个网络端口,用于接受数据或者从本地文件系统中读取数据,每个Source必须至少连接一个Channel。当然一个Source也可以连接多个Channnel,这取决于系统设计的需要。Channel主要是用来缓冲Agent以及接受,但尚未写出到另外一个Agent或者存储系统的数据。Channel的行为比较像队列,Source写入到他们,Sink从他们中读取数据。多个Source可以安全的写入到同一Channel中,并且多个Sink可以从同一个Channel中读取数据。可是一个Sink只能从一个Channel读取数据,如果多个Sink从相同的Channel中读取数据,系统可以保证只有一个Sink会从Channel读取一个特定的事件。Sink会连续轮训各自的Channel来读取和删除事件。Sink将事件推送到下一阶段(RPCSink的情况下),或者到达最终目的地。一旦在下一阶段或者其目的地中数据是安全的,Sink通过事务提交通知Channel,可以从Channel中删除这一事件。

       Agent组件实质上是Multi-Client组件。因为其与Server组件采用相同的技术架构。一个Agent组件对象可同时建立和高效处理大规模Socket连接。所以,Agent组件本质上是Client组件。一个Agent对象能同时管理多个客户端连接。

flume工作进程由什么组件构成

       Flume工作进程主要由以下组件构成

       1. 源(Source)组件

       Flume的源组件负责接收各种类型的数据,是数据进入Flume的第一个组件。常见的源包括Avro、Kafka、Twitter等,它们可以从不同的应用场景或系统中捕获数据。这些源能够将数据发送至Flume进行后续的传输和处理。

       2. 通道(Channel)组件

       通道是Flume中用于缓存数据的一个组件,它充当数据的临时存储库。当源组件接收到数据时,这些数据首先存储在通道中。通道的设计是为了实现数据的缓冲功能,确保数据的传输不会因为速度差异或其他因素而中断。Flume支持多种类型的通道,如内存通道、文件通道等。

       3. 目的地(Destination)组件

       目的地是Flume中数据的最终去处。当数据从通道中被取出时,目的地组件负责将这些数据发送到指定的目标,如Hadoop、HDFS、数据库或其他系统。目的地组件确保数据能够准确地送达其目的地并进行相应的处理。

       详细解释

       Flume作为一个分布式的数据收集、聚合和传输系统,其核心工作原理是通过上述三个组件实现的。源组件负责从外部系统捕获数据,这些数据被收集后存储在通道中,形成一个临时的数据存储队列。随后,目的地组件从通道中取出数据并将其发送到最终的目标系统。在这个过程中,通道起到了缓冲的作用,确保数据的传输不会因为各种原因而中断。这三个组件协同工作,使得Flume能够有效地在分布式系统中进行数据的传输和处理。通过配置不同的源、通道和目的地,Flume可以灵活地适应不同的应用场景和需求。

flume 的source 、channel和sink 多种组合

       flume 有三大组件source 、channel和sink,各个组件之间都可以相互组合使用,各组件间耦合度低。使用灵活,方便。

        1.多sink

        channel 的内容只输出一次,同一个event 如果sink1 输出,sink2 不输出;如果sink1 输出,sink1 不输出。 最终 sink1+sink2=channel 中的数据。

        配置文件如下:

        a1.sources=r1a1.sinks= k1 k2a1.channels= c1# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# channela1.channels.c1.type= memorya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:a1.sinks.k1.kafka.flumeBatchSize=a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c1#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

        2.多 channel 多sink ,每个sink 输出内容一致

        (memory channel 用于kafka操作,实时性高,file channel 用于 sink file 数据安全性高) 

        (多channel 单 sink 的情况没有举例,个人感觉用处不广泛。)

        配置文件如下:

        a1.sources=r1a1.sinks= k1 k2a1.channels= c1 c2# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1 c2a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log#多个channel 的数据相同a1.sources.r1.selector.type=replicating# channel1a1.channels.c1.type= memorya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=#channel2a1.channels.c2.type= filea1.channels.c2.checkpointDir= /opt/apps/flume-1.7.0/checkpointa1.channels.c2.dataDirs= /opt/apps/flume-1.7.0/data#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:a1.sinks.k1.kafka.flumeBatchSize=a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c2#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

        3. 多source 单 channel 单 sink

        多个source 可以读取多种信息放在一个channel 然后输出到同一个地方 

        配置文件如下:

        a1.sources=r1r2a1.sinks= k1a1.channels= c1# source1a1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# source2a1.sources.r2.type= execa1.sources.r2.shell= /bin/bash -ca1.sources.r2.channels= c1a1.sources.r2.command= tail -F /opt/apps/logs/tail2.log# channel1  in memorya1.channels.c1.type= memorya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:a1.sinks.k1.kafka.flumeBatchSize=a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy

        flume 像乐高积木一样可以自己随心所欲将不同的组件进行搭配使用,耦合度低。

       Source

        rpc远程过程调用协议,客户机与服务机的调用模式需要对数据进行序列化。

                 1:客户机将参数序列化并以二进制形式通过网络传输到服务器。

                 2:服务器接收到后进行反序列化再调用方法获取返回值。

                 3:服务器将返回值序列化后再通过网络传输给客户机。

                 4:客户机接收到结果后再进行反序列化获取结果。

        Avro source:

                 Avro就是一种序列化形式,avrosource监听一个端口只接收avro序列化后的数据,其他类型的不接收。

                 type:avrosource的类型,必须是avro。

        bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

        port:绑定的本地的端口。

       Thrif source:

                 和avro一样是一种数据序列化形式,Thrifsource只采集thrift数据序列化后的数据

       Exec source:

                 采集linux命令的返回结果传输给channel

                 type:source的类型:必须是exec。

        command:要执行命令。

        tail  –f  若文件被删除即使重新创建同名文件也不会监听

                tail  -F  只要文件同名就可以继续监听

        以上可以用在日志文件切割时的监听

       JMS Source:

        Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;

       Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;

       Kafka Source:从kafka服务中采集数据。

       NetCat Source:绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入

                type:source的类型,必须是netcat。

        bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

        port:绑定的本地的端口。

       HTTP Source:监听HTTP POST和 GET产生的数据的采集

       Chanel

                 是一个数据存储池,中间通道,从source中接收数据再向sink目的地传输,如果sink写入失败会自动重写因此不会造成数据丢失。

                 Memory:用内存存储,但服务器宕机会丢失数据。

                         Typechannel的类型:必须为memory

        capacity:channel中的最大event数目

        transactionCapacity:channel中允许事务的最大event数目

                File:使用文件存储数据不会丢失数据但会耗费io。

                         Typechannel的类型:必须为 file

        checkpointDir :检查点的数据存储目录

        dataDirs :数据的存储目录

        transactionCapacity:channel中允许事务的最大event数目

                SpillableMemory Channel:内存文件综合使用,先存入内存达到阀值后flush到文件中。

                        Typechannel的类型:必须为SPILLABLEMEMORY

        memoryCapacity:内存的容量event数

        overflowCapacity:数据存到文件的event阀值数

        checkpointDir:检查点的数据存储目录

        dataDirs:数据的存储目录

                Jdbc:使用jdbc数据源来存储数据。

                 Kafka:使用kafka服务来存储数据。

       Sink

                 各种类型的目的地,接收channel写入的数据并以指定的形式表现出来。Sink有很多种类型。

        type:sink的类型 必须是hdfs。

        hdfs.path:hdfs的上传路径。

        hdfs.filePrefix:hdfs文件的前缀。默认是:FlumeData

        hdfs.rollInterval:间隔多久产生新文件,默认是:(秒) 0表示不以时间间隔为准。

        hdfs.rollSize:文件到达多大再产生一个新文件,默认是:(bytes)0表示不以文件大小为准。

        hdfs.rollCount:event达到多大再产生一个新文件,默认是:(个)0表示不以event数目为准。

        hdfs.batchSize:每次往hdfs里提交多少个event,默认为

        hdfs.fileType:hdfs文件的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要设置压缩方式。

        hdfs.codeC:压缩方式:gzip,bzip2, lzo, lzop, snappy

        注:%{ host}可以使用header的key。以及%Y%m%d来表示时间,但关于时间的表示需要在header里有timestamp这个key。

       Logger Sink将数据作为日志处理(根据flume中的设置的日志方式来显示)

        要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console。

        type:sink的类型:必须是logger。

        maxBytesToLog:打印body的最长的字节数 默认为

       Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。

                         type:sink的类型:必须是 avro。

        hostname:指定发送数据的主机名或者ip

        port:指定发送数据的端口

        实例

        1:监听一个文件的增加变化,采集数据并在控制台打印。

        在这个例子中我使用exec source,memory chanel,logger sink。可以看我的agent结构图

        以下是我创建的exec_source.conf

        a1.sources=r1

        a1.channels=c1

        a1.sinks=k1

       a1.sources.r1.type=exec

        a1.sources.r1.command=tail -F/usr/local/success.log

       a1.channels.c1.type=memory

        a1.channels.c1.capacity=

        a1.channels.c1.transactioncapacity=

       a1.sinks.k1.type=logger

       a1.sources.r1.channels=c1

        a1.sinks.k1.channel=c1

       æ‰§è¡Œå‘½ä»¤ï¼š

        bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &

       ç„¶åŽæ›´æ”¹/usr/local/success.log文件中的内容后可以看到flume采集到了文件的变化并在控制台上打印出来。文件初始内容hello和how are you,剩下的i am fine和ok为新增加内容。

        2:监控一个文件变化并将其发送到另一个服务器上然后打印

        这个例子可以建立在上一个例子之上,但是需要对flume的结构做一些修改,我使用avro序列化数据再发送到指定的服务器上。详情看结构图。

        实际上flume可以进行多个节点关联,本例中我只使用向发送数据

        ,上都必须启动agent

        服务器配置

        以下是我创建的exec_source_avro_sink.conf

        a1.sources=r1

        a1.channels=c1

        a1.sinks=k1

       a1.sources.r1.type=exec

        a1.sources.r1.command=tail -F/usr/local/success.log

       a1.channels.c1.type=memory

        a1.channels.c1.capacity=

        a1.channels.c1.transactioncapacity=

       a1.sinks.k1.type=avro

        a1.sinks.k1.hostname=...

        a1.sinks.k1.port=

       a1.sources.r1.channels=c1

        a1.sinks.k1.channel=c1

       æ‰§è¡Œå‘½ä»¤å¯åŠ¨agent

        bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

       æœåŠ¡å™¨é…ç½®

        执行命令拷贝flume到

        scp -r apache-flume-1.7.0-bin/root@...:/usr/local/

        修改exec_source_avro_sink.conf

        a1.sources=r1

        a1.channels=c1

        a1.sinks=k1

       a1.sources.r1.type=avro

        a1.sources.r1.bind=0.0.0.0

        a1.sources.r1.port=

       a1.channels.c1.type=memory

        a1.channels.c1.capacity=

        a1.channels.c1.transactioncapacity=

       a1.sinks.k1.type=logger

        a1.sources.r1.channels=c1

        a1.sinks.k1.channel=c1

       æ‰§è¡Œå‘½ä»¤å¯åŠ¨agent

        bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

       ç»“果可以在控制台上看到中修改success.log的变化信息

        3:avro-client实例

        执行bin/flume-ng会提示有命令如下

        help                     display this help text

        agent                     run aFlume agent

        avro-client               run anavro Flume client

        version                   show Flume version info

       avro-clinet是avro客户端,可以把本地文件以avro序列化方式序列化后发送到指定的服务器端口。本例就是将的一个文件一次性的发送到中并打印。

        Agent结构图如下

        启动的是一个avro-client,它会建立连接,发送数据,断开连接,它只是一个客户端。

        启动一个avro客户端

        bin/flume-ngavro-client --conf conf/ --host ... --port --filename/usr/local/success.log --headerFile /usr/local/kv.log

       --headerFile是用来区分是哪个服务器发送的数据,kv.log中的内容会被发送到,可以作为标识来使用。

       çš„avro_client.conf如下

        a1.sources=r1

        a1.channels=c1

        a1.sinks=k1

       a1.sources.r1.type=avro

        a1.sources.r1.bind=0.0.0.0

        a1.sources.r1.port=

       a1.channels.c1.type=memory

        a1.channels.c1.capacity=

        a1.channels.c1.transactioncapacity=

       a1.sinks.k1.type=logger

       a1.sources.r1.channels=c1

        a1.sinks.k1.channel=c1

       å¯åŠ¨agent

        bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &

       æŽ§åˆ¶å°æ˜¾ç¤ºå¦‚下

        可以看到headers的内容headers:{ hostname=...}

        注意:

        1:Flume服务没有stop命令需要通过kill来杀掉进行,可以使用jps  -m来确认是那个agent的number

        [root@shb conf]# jps -m

        Jps -m

        Application --conf-fileconf/exec_source.conf --name a1

       2:修改flume的配置文件后如avro_client.conf,flume会自动重启

       3:logger sink默认只显示个字节

       4:flume是以event为单位进行数据传输的,其中headers是一个map容器map

        Event: { headers:{ hostname=...}body:                                            1a }

       5:flume支持多节点关联但是sink和source的类型要一致,比如avro-client发送数据那么接收方的source也必须是avro否则会警告。

相关栏目:综合