博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume-ng 配置channel轮询负载均衡
阅读量:7046 次
发布时间:2019-06-28

本文共 12632 字,大约阅读时间需要 42 分钟。

hot3.png

修改之前通过源码分析source在选择channel处理的核心类: org.apache.flume.channel.ChannelProcessor

public void processEventBatch(List
events) { Preconditions.checkNotNull(events, "Event list must not be null"); events = interceptorChain.intercept(events); Map
> reqChannelQueue = new LinkedHashMap
>(); Map
> optChannelQueue = new LinkedHashMap
>(); for (Event event : events) { List
reqChannels = selector.getRequiredChannels(event); for (Channel ch : reqChannels) { List
eventQueue = reqChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList
(); reqChannelQueue.put(ch, eventQueue); } eventQueue.add(event); } List
optChannels = selector.getOptionalChannels(event); for (Channel ch: optChannels) { List
eventQueue = optChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList
(); optChannelQueue.put(ch, eventQueue); } eventQueue.add(event); } } // Process required channels for (Channel reqChannel : reqChannelQueue.keySet()) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List
batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { reqChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } } } // Process optional channels for (Channel optChannel : optChannelQueue.keySet()) { Transaction tx = optChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List
batch = optChannelQueue.get(optChannel); for (Event event : batch ) { optChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); LOG.error("Unable to put batch on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { if (tx != null) { tx.close(); } } } }

这个方法中 selector 属性决定channel的选择器,该属性在配置中

自定义agent名称.sources.自定义source源名称.selector.type

selector.getRequiredChannels 该方法来决定获取到的需要接受事件的channel,selector.getOptionalChannels 该方法来决定optional规则来选取channel进行发送事件,然而要实现负载均衡的方式来处理source发过来的事件,这个地方可以配置我们自定义的 selector 的规则类来改变,自己新增LbChannelSelector 类重写MultiplexingChannelSelector类(ps:optional规则来选取channel的flume中默认的处理),增加selector的一个属性polling来配置需要负载均衡的channel名称,在配置中加入:

自定义agent名称.sources.自定义source源名称.selector.polling = ch1 ch2 ch3

后面的ch1 ch2 ch3 是配置好的channel 的名称

并修改getRequiredChannels 方法,在其中加入对polling属性的代码支持,使得返回结果是支持轮询方式得到source需要发送事件的channel,整个负载均衡的如下代码:

import java.util.*;import org.apache.commons.collections.CollectionUtils;import org.apache.commons.lang3.StringUtils;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.channel.AbstractChannelSelector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * The type Load balance channel selector. * * @author pengming   * @date 2016年11月22日 16:38:49 * @description */public class LbChannelSelector extends AbstractChannelSelector {    /**     * The constant CONFIG_MULTIPLEX_HEADER_NAME.     */    public static final String CONFIG_MULTIPLEX_HEADER_NAME = "header";    /**     * The constant DEFAULT_MULTIPLEX_HEADER.     */    public static final String DEFAULT_MULTIPLEX_HEADER = "flume.selector.header";    /**     * The constant CONFIG_PREFIX_MAPPING.     */    public static final String CONFIG_PREFIX_MAPPING = "mapping.";    /**     * The constant CONFIG_DEFAULT_CHANNEL.     */    public static final String CONFIG_DEFAULT_CHANNEL = "default";    public static final String CONFIG_PREFIX_POLLING = "polling";    public static final String CONFIG_PREFIX_COPY = "copy";    /**     * The constant CONFIG_PREFIX_OPTIONAL.     */    public static final String CONFIG_PREFIX_OPTIONAL = "optional";    @SuppressWarnings("unused")    private static final Logger LOG = LoggerFactory.getLogger(LbChannelSelector.class);    private static final List
EMPTY_LIST = Collections.emptyList(); private String headerName; private Map
> channelMapping; private Map
> optionalChannels; private List
defaultChannels; /** 复制事件到对应的channels中 */ private List
copyChannels; /** 轮询loadBalanceChannels 的下标 */ private volatile int index = 0; /** 轮询负载均衡到该list 中的 channel */ private List
loadBalanceChannels; @Override public List
getRequiredChannels(Event event) { /** 根据header 配置值来找到对应处理的 channel */ String headerValue = event.getHeaders().get(headerName); if (StringUtils.isNotEmpty(StringUtils.trim(headerValue))) { List
channels = channelMapping.get(headerValue); if (CollectionUtils.isNotEmpty(channels)) { return channels; } } /** 复制 */ if (CollectionUtils.isNotEmpty(copyChannels)) { return copyChannels; } /** 轮询选择 channel */ if (CollectionUtils.isNotEmpty(loadBalanceChannels)) { return getLbChannels(); } return defaultChannels; } public synchronized List
getLbChannels() { List
channels = new ArrayList<>(1); channels.add(loadBalanceChannels.get(index)); if (index++ >= (loadBalanceChannels.size() - 1)) { index = 0; } return channels; } @Override public List
getOptionalChannels(Event event) { String hdr = event.getHeaders().get(headerName); List
channels = optionalChannels.get(hdr); if (channels == null) { channels = EMPTY_LIST; } return channels; } @Override public void configure(Context context) { Map
channelNameMap = getChannelNameMap(); /** 设置header */ this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME, DEFAULT_MULTIPLEX_HEADER); /** 默认处理事件的channel */ defaultChannels = getChannelListFromNames(context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap); LOG.info("默认处理 defaultChannelsSize: " + defaultChannels.size()); if (CollectionUtils.isEmpty(defaultChannels)) { defaultChannels = EMPTY_LIST; } /** 配置负载均衡,轮询的channel */ loadBalanceChannels = getChannelListFromNames(context.getString(CONFIG_PREFIX_POLLING), channelNameMap); LOG.info("轮询负载 loadBalanceSize: " + loadBalanceChannels.size()); if (CollectionUtils.isEmpty(loadBalanceChannels)) { loadBalanceChannels = EMPTY_LIST; } /** 复制事件到对应的channel 中 */ copyChannels = getChannelListFromNames(context.getString(CONFIG_PREFIX_COPY), channelNameMap); LOG.info("复制处理 copyChannelsSize: " + copyChannels.size()); if (CollectionUtils.isEmpty(copyChannels)) { copyChannels = EMPTY_LIST; } /** 设置header 值对应的 channel 映射处理 */ Map
mapConfig = context.getSubProperties(CONFIG_PREFIX_MAPPING); channelMapping = new HashMap
>(); for (String headerValue : mapConfig.keySet()) { List
configuredChannels = getChannelListFromNames(mapConfig.get(headerValue), channelNameMap); //This should not go to default channel(s) //because this seems to be a bad way to configure. if (configuredChannels.size() == 0) { throw new FlumeException("No channel configured for when " + "header value is: " + headerValue); } if (channelMapping.put(headerValue, configuredChannels) != null) { throw new FlumeException("Selector channel configured twice"); } } //If no mapping is configured, it is ok. //All events will go to the default channel(s). /** 配置 其他规则, 如果与默认配置channel冲突则删除, 发送到对应的channel 与 ChannelProcessor 处理相关 */ Map
optionalChannelsMapping = context.getSubProperties(CONFIG_PREFIX_OPTIONAL + "."); optionalChannels = new HashMap
>(); for (String hdr : optionalChannelsMapping.keySet()) { List
confChannels = getChannelListFromNames(optionalChannelsMapping.get(hdr), channelNameMap); if (confChannels.isEmpty()) { confChannels = EMPTY_LIST; } //Remove channels from optional channels, which are already //configured to be required channels. List
reqdChannels = channelMapping.get(hdr); //Check if there are required channels, else defaults to default channels if (CollectionUtils.isEmpty(reqdChannels)) { reqdChannels = defaultChannels; } for (Channel c : reqdChannels) { if (confChannels.contains(c)) { confChannels.remove(c); } } if (optionalChannels.put(hdr, confChannels) != null) { throw new FlumeException("Selector channel configured twice"); } } }}

整体的flume配置:

# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements.  See the NOTICE file# distributed with this work for additional information# regarding copyright ownership.  The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License.  You may obtain a copy of the License at##  http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied.  See the License for the# specific language governing permissions and limitations# under the License.# The configuration file needs to define the sources, # the channels and the sinks.# Sources, channels and sinks are defined per agent, # in this case called 'agent'agent.sources = src1agent.channels = ch1 ch2 ch3agent.sinks = sink1 sink2 sink3# For each one of the sources, the type is definedagent.sources.src1.type=com.flume.source.HTTPSource#dev#agent.sources.src1.bind=10.10.10.79#testagent.sources.src1.bind=10.20.22.223#agent.sources.src1.bind=10.20.22.225agent.sources.src1.port=8990# The channel can be defined as follows.agent.sources.src1.channels = ch1 ch2 ch3agent.sources.src1.selector.type = com.flume.source.LbChannelSelectoragent.sources.src1.selector.default = ch1agent.sources.src1.selector.polling = ch1 ch2 ch3agent.sources.src1.handler = com.flume.source.JsonHandler#agent.sources.src1.batchSize = 1000agent.sources.src1.keep-alive = 3# Each sink's type must be defined#agent.sinks.sink1.type = logger#agent.sinks.sink2.type = logger#Specify the channel the sink should useagent.sinks.sink1.channel = ch1agent.sinks.sink1.type = com.flume.sink.KafkaSinkagent.sinks.sink1.useFlumeEventFormat = trueagent.sinks.sink1.kafka.topic = dev_tagagent.sinks.sink1.kafka.bootstrap.servers = wx-kafka-03:9092,wx-kafka-04:9092,wx-kafka-05:9092,wx-kafka-06:9092,wx-kafka-07:9092,wx-kafka-08:9092agent.sinks.sink1.flumeBatchSize = 1000agent.sinks.sink1.kafka.producer.acks = 1agent.sinks.sink1.kafka.producer.linger.ms = 1agent.sinks.sink1.kafka.producer.compression.type = snappyagent.sinks.sink2.channel = ch2agent.sinks.sink2.type = com.flume.sink.KafkaSinkagent.sinks.sink2.useFlumeEventFormat = trueagent.sinks.sink2.kafka.topic = dev_tagagent.sinks.sink2.kafka.bootstrap.servers = wx-kafka-03:9092,wx-kafka-04:9092,wx-kafka-05:9092,wx-kafka-06:9092,wx-kafka-07:9092,wx-kafka-08:9092agent.sinks.sink2.flumeBatchSize = 1000agent.sinks.sink2.kafka.producer.acks = 1agent.sinks.sink2.kafka.producer.linger.ms = 1agent.sinks.sink2.kafka.producer.compression.type = snappyagent.sinks.sink3.channel = ch3agent.sinks.sink3.type = com.flume.sink.KafkaSinkagent.sinks.sink3.useFlumeEventFormat = trueagent.sinks.sink3.kafka.topic = dev_tagagent.sinks.sink3.kafka.bootstrap.servers = wx-kafka-03:9092,wx-kafka-04:9092,wx-kafka-05:9092,wx-kafka-06:9092,wx-kafka-07:9092,wx-kafka-08:9092agent.sinks.sink3.flumeBatchSize = 1000agent.sinks.sink3.kafka.producer.acks = 1agent.sinks.sink3.kafka.producer.linger.ms = 1agent.sinks.sink3.kafka.producer.compression.type = snappy#agent.sinkgroups = g1#agent.sinkgroups.g1.sinks = sink1 sink2 sink3#agent.sinkgroups.g1.processor.type = load_balance#agent.sinkgroups.g1.processor.backoff=true#agent.sinkgroups.g1.processor.selector=round_robin#agent.sinkgroups.g1.processor.priority.sink1 = 9#agent.sinkgroups.g1.processor.priority.sink2 = 7#agent.sinkgroups.g1.processor.priority.sink3 = 8#agent.sinkgroups.g1.processor.maxpenalty = 10000# Each channel's type is defined.agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 100000agent.channels.ch1.transactionCapacity = 10000agent.channels.ch2.type = memoryagent.channels.ch2.capacity = 100000agent.channels.ch2.transactionCapacity = 10000agent.channels.ch3.type = memoryagent.channels.ch3.capacity = 100000agent.channels.ch3.transactionCapacity = 10000

其中的配置的kafka,请自行修改成为自己的集群地址,我这里版本是flume1.6,因为公司kafka版本比较低,所以需要自己加入kafka的client的jar包,flume1.7不需要增加,请自行注释掉pom中kafka的client的jar包,完整代码地址:   整个工程是flume的插件直接maven打包会有一个 -bin的文件,将其copy到flume根目录直接解压,启动请参考相关的文章:  的摘要说明去修改service.sh脚本中的flume启动命令参数。

转载于:https://my.oschina.net/mingpeng/blog/874401

你可能感兴趣的文章
Building Android Apps 30条建议
查看>>
【Spring实战】—— 13 AspectJ注解切面
查看>>
Java 集合并交补
查看>>
16.4. jstat - Java Virtual Machine Statistics Monitoring Tool
查看>>
一脸懵逼学习KafKa集群的安装搭建--(一种高吞吐量的分布式发布订阅消息系统)...
查看>>
[Everyday Mathematics]20150209
查看>>
Python图片处理库之PIL
查看>>
(转)基于深度学习的物体检测
查看>>
利用java代码和web拦截器轻松实现一个app抓包工具
查看>>
数制系统
查看>>
cmd连接mysql操作命令
查看>>
“2017最受欢迎中国开源软件”奖TOP 20揭晓 阿里中间件4大项目连续霸榜!
查看>>
Log4j 配置 的webAppRootKey参数问题
查看>>
第 14 章 MyBatis
查看>>
176.6. git-svn - Bidirectional operation between a single Subversion branch and git
查看>>
linux中授予普通用户root权限
查看>>
我的架构之路 — 配置中心(二)— 在已有项目中实际应用
查看>>
分布式监控系统Zabbix3.2对数据库的连接数预警
查看>>
undo表空间文件丢失恢复(1)--有备份
查看>>
[20151017]lsnrctl servcices.txt
查看>>