如何利用jgroups实现分布式
发布网友
发布时间:2022-04-29 18:33
我来回答
共1个回答
热心网友
时间:2022-06-19 06:45
我自己写了一个JavaGroupBroadcastingManager.Java类实现消息的管理(包括发送和接收),代码参考了oscache的相关代码,在其基础上进行了改进.
代码如下:
1、JavaGroupBroadcastingManager.java
package com.yz;
import com.opensymphony.oscache.base.FinalizationException;
import com.opensymphony.oscache.base.InitializationException;
import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.blocks.NotificationBus;
import java.io.Serializable;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class JavaGroupBroadcastingManager
implements NotificationBus.Consumer {
private static final Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
private static final String BUS_NAME = "OSCacheBus";
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
private NotificationBus bus;
/**
* Initializes the broadcasting listener by starting up a JavaGroups notification
* bus instance to handle incoming and outgoing messages.
*
*/
public synchronized void initialize(Properties config) throws InitializationException {
String properties = config.getProperty(CHANNEL_PROPERTIES);
String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
if (log.isInfoEnabled()) {
log.info("Starting a new JavaGroups broadcasting listener with properties="
+ properties);
}
try {
bus = new NotificationBus(BUS_NAME, properties);
bus.start();
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
bus.setConsumer(this);
log.info("JavaGroups clustering support started successfully");
} catch (Exception e) {
throw new InitializationException("Initialization failed: " + e);
}
}
/**
* Shuts down the JavaGroups being managed
*/
public synchronized void finialize() throws FinalizationException {
if (log.isInfoEnabled()) {
log.info("JavaGroups shutting down...");
}
bus.stop();
bus = null;
if (log.isInfoEnabled()) {
log.info("JavaGroups shutdown complete.");
}
}
/**
* Uses JavaGroups to broadcast the supplied notification message across the cluster.
*
*/
protected void sendNotification(Serializable message) {
bus.sendNotification(message);
}
/**
* Handles incoming notification messages from JavaGroups. This method should
* never be called directly.
*
*/
public void handleNotification(Serializable serializable) {
log.info("An cluster notification message received message " + serializable.toString()
+ "). Notification ignored.");
}
/**
* We are not using the caching, so we just return something that identifies
* us. This method should never be called directly.
*/
public Serializable getCache() {
return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
}
/**
* A callback that is fired when a new member joins the cluster. This
* method should never be called directly.
*
* @param address The address of the member who just joined.
*/
public void memberJoined(Address address) {
if (log.isInfoEnabled()) {
log.info("A new member at address '" + address + "' has joined the cluster");
}
}
/**
* A callback that is fired when an existing member leaves the cluster.
* This method should never be called directly.
*
* @param address The address of the member who left.
*/
public void memberLeft(Address address) {
if (log.isInfoEnabled()) {
log.info("Member at address '" + address + "' left the cluster");
}
}
}
2、发送消息的程序:
package com.yz;
import java.io.FileInputStream;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class TestJavaGroupBroadcastSend {
public static void main(String[] args) throws Exception {
JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
Properties properties = new Properties();
properties.load(new FileInputStream("javagroup.properties"));
javaGroupBroadcastingManager.initialize(properties);
String message = "hello world!";
while (true) {
Thread.sleep(1000);
javaGroupBroadcastingManager.sendNotification(message);
}
}
}
3、接受消息的程序:
package com.yz;
import java.io.FileInputStream;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class TestJavaGroupBroadcastReceive {
public static void main(String[] args) throws Exception {
JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
Properties properties = new Properties();
properties.load(new FileInputStream("javagroup.properties"));
javaGroupBroadcastingManager.initialize(properties);
Thread.sleep(100000000);
}
}
4、配置文件:(基本上不用改动)
javagroup.properties
cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;/
mcast_send_buf_size=150000;mcast_recv_buf_size=80000):/
PING(timeout=2000;num_initial_members=3):/
MERGE2(min_interval=5000;max_interval=10000):/
FD_SOCK:VERIFY_SUSPECT(timeout=1500):/
pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):/
UNICAST(timeout=300,600,1200,2400):/
pbcast.STABLE(desired_avg_gossip=20000):/
FRAG(frag_size=8096;down_thread=false;up_thread=false):/
pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
cache.cluster.multicast.ip=231.12.21.132
5、所需要的jar包
commons-logging-1.0.4.jar
jgroups-2.2.8.jar concurrent.jar 属于jgroups的包
6、说明:
1、发送消息和接受消息的程序都需要调用JavaGroupBroadcastingManager.initialize()方法初始化jgroup。
2、运行环境的多台服务器要在同一个局域网内,同时hosts中不要将127.0.0.1写入,以便jgroup获得本机的ip,而不是获得127.0.0.1