问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

如何利用jgroups实现分布式

发布网友 发布时间:2022-04-29 18:33

我来回答

1个回答

热心网友 时间:2022-06-19 06:45

我自己写了一个JavaGroupBroadcastingManager.Java类实现消息的管理(包括发送和接收),代码参考了oscache的相关代码,在其基础上进行了改进.
代码如下:
1、JavaGroupBroadcastingManager.java
package com.yz;
import com.opensymphony.oscache.base.FinalizationException;
import com.opensymphony.oscache.base.InitializationException;
import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.blocks.NotificationBus;
import java.io.Serializable;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class JavaGroupBroadcastingManager
implements NotificationBus.Consumer {
private static final Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
private static final String BUS_NAME = "OSCacheBus";
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
private NotificationBus bus;
/**
* Initializes the broadcasting listener by starting up a JavaGroups notification
* bus instance to handle incoming and outgoing messages.
*
*/
public synchronized void initialize(Properties config) throws InitializationException {
String properties = config.getProperty(CHANNEL_PROPERTIES);
String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
if (log.isInfoEnabled()) {
log.info("Starting a new JavaGroups broadcasting listener with properties="
+ properties);
}
try {
bus = new NotificationBus(BUS_NAME, properties);
bus.start();
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
bus.setConsumer(this);
log.info("JavaGroups clustering support started successfully");
} catch (Exception e) {
throw new InitializationException("Initialization failed: " + e);
}
}
/**
* Shuts down the JavaGroups being managed
*/
public synchronized void finialize() throws FinalizationException {
if (log.isInfoEnabled()) {
log.info("JavaGroups shutting down...");
}
bus.stop();
bus = null;
if (log.isInfoEnabled()) {
log.info("JavaGroups shutdown complete.");
}
}
/**
* Uses JavaGroups to broadcast the supplied notification message across the cluster.
*
*/
protected void sendNotification(Serializable message) {
bus.sendNotification(message);
}
/**
* Handles incoming notification messages from JavaGroups. This method should
* never be called directly.
*
*/
public void handleNotification(Serializable serializable) {
log.info("An cluster notification message received message " + serializable.toString()
+ "). Notification ignored.");
}
/**
* We are not using the caching, so we just return something that identifies
* us. This method should never be called directly.
*/
public Serializable getCache() {
return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
}
/**
* A callback that is fired when a new member joins the cluster. This
* method should never be called directly.
*
* @param address The address of the member who just joined.
*/
public void memberJoined(Address address) {
if (log.isInfoEnabled()) {
log.info("A new member at address '" + address + "' has joined the cluster");
}
}
/**
* A callback that is fired when an existing member leaves the cluster.
* This method should never be called directly.
*
* @param address The address of the member who left.
*/
public void memberLeft(Address address) {
if (log.isInfoEnabled()) {
log.info("Member at address '" + address + "' left the cluster");
}
}
}
2、发送消息的程序:
package com.yz;
import java.io.FileInputStream;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class TestJavaGroupBroadcastSend {
public static void main(String[] args) throws Exception {
JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
Properties properties = new Properties();
properties.load(new FileInputStream("javagroup.properties"));
javaGroupBroadcastingManager.initialize(properties);
String message = "hello world!";
while (true) {
Thread.sleep(1000);
javaGroupBroadcastingManager.sendNotification(message);
}
}
}

3、接受消息的程序:
package com.yz;
import java.io.FileInputStream;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class TestJavaGroupBroadcastReceive {
public static void main(String[] args) throws Exception {
JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
Properties properties = new Properties();
properties.load(new FileInputStream("javagroup.properties"));
javaGroupBroadcastingManager.initialize(properties);

Thread.sleep(100000000);
}
}
4、配置文件:(基本上不用改动)
javagroup.properties
cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;/
mcast_send_buf_size=150000;mcast_recv_buf_size=80000):/
PING(timeout=2000;num_initial_members=3):/
MERGE2(min_interval=5000;max_interval=10000):/
FD_SOCK:VERIFY_SUSPECT(timeout=1500):/
pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):/
UNICAST(timeout=300,600,1200,2400):/
pbcast.STABLE(desired_avg_gossip=20000):/
FRAG(frag_size=8096;down_thread=false;up_thread=false):/
pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
cache.cluster.multicast.ip=231.12.21.132
5、所需要的jar包
commons-logging-1.0.4.jar
jgroups-2.2.8.jar concurrent.jar 属于jgroups的包
6、说明:
1、发送消息和接受消息的程序都需要调用JavaGroupBroadcastingManager.initialize()方法初始化jgroup。
2、运行环境的多台服务器要在同一个局域网内,同时hosts中不要将127.0.0.1写入,以便jgroup获得本机的ip,而不是获得127.0.0.1
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
梦见老婆光脚踩谷子 王者荣耀国服中,不论进入游戏还是没开始游戏,所有玩家发送的任何消息... 我的滴滴开空调活动怎么没了 为什么腾讯视频显示还是要流量看 没有联通中国结标志 电脑不能装win7买电脑时他们说只能装win10系统不能装win7 英雄联盟手游 手机lol游戏盒子现在叫什么? 英雄联盟手机盒子看不了比赛记录 手游英雄联盟视频在哪里看手机英雄联盟盒子在哪看我游戏中录的视频 英雄联盟盒子看视频只有声音没有画面,不是网速的问题,也不是电脑卡,是... 为什么英雄联盟盒子没发看视频 windows Internet Explorer此网页上的错误可能会使它无法正确运行 电脑上网开网页提示(是ie10的浏览器)此网页上的错误可能会使它无法正确运行 java中的分布式是什么意思,和在同一台电脑上运行的程序有什么区别? 打开某些网页屏幕右下角此网页上的错误命令可能会使它无法正确运行 缺少对象 F 代码;0 URI:http//tieba.b 为什么打开网页时老是出现“此网页上的错误可能会使它无法正确运行”? 此网页上的错误可能会使它无法正确运行 电脑上网开网页提示,此网页上的错误可能会使它无法正确运行。 电脑显示“此网页上的错误可能会使它无法正确运行”是什么问题? 在幻灯片的左上角总是有一个图标,如何能将它去掉? 背中国行政区 离广东近的有哪些地区或哪些省? 税务局领发票流程是什么 东北经济水平和江浙沪的差距有多大 巴中属于华东还是华南 广西属于江浙沪皖吗 请问华中仓,和华南仓各是哪里的啊? 山东属于什么地区?华东、华南、东北?还是什么地区?多谢了! 小米11图片转pdf不能用了- 问一问 台湾属于华东南??华东,还是华南??? 怎么删除爱奇艺32位图标? 分布式Java应用的目录 50分紧急求助,用ie浏览器打开一些网页, 总是显示 “此页面上的错误可能会使它无法正常运行” 是什么原因 梦到家里有人偷书而且是当废纸卖掉什么意思? 玫瑰小镇网页打不开 显示网页上有错误 可是别的网页都能打开 如何部署java分布式应用程序 美国LINUX空间和香港LINUX空间那个更好用? 请教:电脑屏幕上出现:Windows Internet Explorer ! 此网页上得错误 求教 打开网页出现【此网页上的错误可能会使它无法正常运行】XP_Frmtravel.asp 代码:0 请看图片 华域迅通的linux香港空间怎么样? 香港LINUX空间怎么操作? 此网页上的错误可能会使它无法正确运行,缺少对象pageheight.js。代码:0. pageheight.js是什么 无忧主机网(51php)的香港Linux虚拟主机怎么样啊? 购买香港空间,用win主机好还是linux主机好? 梦见收到一个旅行箱里面有自己的旧衣服和钱? 香港WIN主机空间和香港LINUX主机空间有什么区别 香港Linux空间的有哪些 废纸盒纸箱怎么卖??? 香港Linux虚拟主机哪家有?哪家主推香港Linux虚拟主机? 推荐速度快的美国或者香港的linux虚拟主机 求香港空间可以子目录绑定,并且要是linux的。