问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

如何在Weblogic的全局事务执行多线程操作

发布网友 发布时间:2023-07-14 17:36

我来回答

1个回答

热心网友 时间:2024-08-11 20:21

如何在Weblogic的全局事务执行多线程操作

出自:http://www.blogjava.net/fjin/archive/2009/07/31/289276.html

今天有人提出了一个诡异的要求,要求在全局事务中执行多线程操作。他们全局事务中涉及两个数据库中的多个表,如果单线程那么走完,相应时间上不满足要求,说白了就是比较慢,于是提出了这样的要求。从JTA的规范来看,transaction(TX)和thread是密切相关的,TX一般是不能在应用线程间传递的, 即我主线程起一个全局事务,然后我把这个事务传递给其他我新起的线程,单纯的变量传递没问题,但这个事务是不能被transaction manager(TM)识别的,TM对TX的管理有他自己的方式。从weblogic的实现来看,TX被放在当前线程的threadlocal中,普通应用线程不存在这样的结构,所以简单的变量传递,对于TM而言是没有意义的。那么到底有没有方法实现上面的需求的,我做了些测试,使用weblogic内部的一些API可以实现这个需求。下面我们就来看看实现中的几个要点: :)

1:上面说了,简单的变量传递对于weblogic的TM是没有意义的。TM判断事务上下文(transaction context)的时候,会从当前线程的threadlocal检查,如果没有,则说明当前线程没有和任何TX关联。那么我们如何将我们手里的TX放入当前线程的threadlocal呢? weblogic的ExecuteThread是我们需要的那种线程,但它是final的,我们不能继承它,只能继承它的父类了,也就是weblogic.kernel.AuditableThread。

2:我们有继承了AuditableThread,那么我们怎么把TX放入它的threadlocal中呢?这个可以通过weblogic的TM实现中的一些API来实现,具体到这个类就是weblogic.transaction.internal.TransactionManagerImpl。比如interResume(tx),internalSuspend()。由于这个API不是package protect的,我们自己的类必须也位于weblogic.transaction.internal这个包中。interResume(tx),用于将当前线程和指定的TX做关联,而internalSuspend()恰恰相反,它用于解除这种关联。

3:因为涉及到多线程,主线程需要决定何时提交或回滚事务,这个我们要自己要实现一个线程结果检查的方法(checkCompletion())。

下面就是我自己实现的测试代码,在Weblogic81测试没有问题。
1 package weblogic.transaction.internal;
2
3 import weblogic.transaction.TxHelper;
4 import weblogic.transaction.internal.TransactionManagerImpl;
5 import javax.transaction.Transaction;
6 import java.util.ArrayList;
7
8 public class DriverTest {
9
10 private static String INITIAL_CONTEXT_FACTORY = "weblogic.jndi.WLInitialContextFactory";
11 private static String PROVIDER_URL = "t3://localhost:8001";
12 private static String SQL_INSERT = "insert into test values(?)";
13 private static String ANO_SQL_INSERT = "insert into test1 values(?)";
14
15 public static void main(String args[])
16 {
17 DriverTest test = new DriverTest();
18 test.multiThreadXATest();
19 }
20
21 private Connection getConnection(String url, String dsName) throws NamingException, SQLException
22 {
23 InitialContext ctx = initializeEnv(url);
24 DataSource ds = (DataSource)ctx.lookup(dsName);
25 ctx.close();
26 return ds.getConnection();
27 }
28
29 private UserTransaction getUserTransaction() throws NamingException, SQLException
30 {
31 InitialContext ctx = initializeEnv(null);
32 return (UserTransaction)ctx.lookup("javax/transaction/UserTransaction");
33 }
34
35 private InitialContext initializeEnv(String url) throws NamingException
36 {
37 Properties prop = new Properties();
38 if(url == null)
39 prop.put(Context.PROVIDER_URL, PROVIDER_URL);
40 else
41 prop.put(Context.PROVIDER_URL, url);
42 prop.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
43 return new InitialContext(prop);
44 }
45
46 private void executeInsertInPSMT(Connection conn, String sql)
47 {
48 PreparedStatement pstmt = null;
49 try{
50 pstmt = conn.prepareStatement(sql);
51 pstmt.setString(1, "data_to_insert");
52 pstmt.executeUpdate();
53 pstmt.close();
54 }catch(SQLException e){
55 e.printStackTrace();
56 }
57 }
58
59 public void multiThreadXATest()
60 {
61 ArrayList result = new ArrayList();
62 try{
63 UserTransaction userTx = getUserTransaction();
64 userTx.setTransactionTimeout(1000);
65 userTx.begin();
66 Transaction tx = TxHelper.getTransaction();
67 Connection conn = getConnection("t3://localhost:8011", "TestXADS");
68 if(conn != null) conn.close();
SQLThread thread1 = new SQLThread(tx,result,"t3://localhost:8011","TestXADS", SQL_INSERT);
69 SQLThread thread2 = new SQLThread(tx,result,"t3://localhost:8021","TestXADS_1", ANO_SQL_INSERT);
70 thread1.start();
71 thread2.start();
72 while(result.size() != 2){
73 Thread.currentThread().sleep(1);
74 }
75 if(checkCompletion(result)){
76 userTx.commit();
77 }
78 else{
79 userTx.rollback();
80 }
81 }catch(Exception e){
82 e.printStackTrace();
83 }
84 }
85
86 private boolean checkCompletion(ArrayList result){
87 boolean toReturn = true;
88 for(int loop=0; loop<result.size(); loop++){
89 if((!((String)result.get(loop)).equals("OK"))){
90 toReturn = false;
91 break;
92 }
93 }
94 return toReturn;
95 }
96
97 class SQLThread extends weblogic.kernel.AuditableThread {
98
99 private Transaction tx = null;
100 private ArrayList result = null;
101 private String dsName = null;
102 private String url = null;
103 private String sql = null;
104
105 public SQLThread(Transaction tx,ArrayList result,String ds, String url, String sql){
106 this.tx = tx;
107 this.result = result;
108 this.dsName = ds;
109 this.url = url;
110 this.sql = sql;
111 }
112
113 public void run(){
114 Connection conn = null;
115 try{
116 TransactionManagerImpl tm = (TransactionManagerImpl)TransactionManagerImpl.getTransactionManager();
117 tm.internalResume((TransactionImpl)tx);
118 DriverTest test = new DriverTest();
119 conn = test.getConnection(url, dsName);
120 test.executeInsertInPSMT(conn, sql);
121 conn.close();
122 tm.internalSuspend();
123 result.add("OK");
124 }catch(Exception e){
125 result.add("NA");
126 e.printStackTrace();
127 }finally{
128 try{
129 if(conn != null)
130 conn.close();
131 }catch(Exception e){
132 e.printStackTrace();
133 }
134 }
135 }
136 }
137 }
138
139

下面是关于上面这段测试代码的一些解释和代码中的*:
1:为什么会在66行出现Connection conn = getConnection("t3://localhost:8011", "TestXADS");这个看似无用的语句?Weblogic的TM实现中只有有XAResource参与到这个global transaction的server实例才有资格充当这个global transaction的coordinator,其他的server实例只能充当sub-coordinator。而且总是第一个参与全局事务的XAResource的实例充当coordinator,因为coordinator的委任决定于TX开始后,第一次RMI request发送给哪个server。Connection conn = getConnection("t3://localhost:8001", "TestXADS")用于指定这个global transaction的coordinator为8011这个server。如果没有这个语句,thread1,thread2启动后,它们开始XA操作时,每个XAResouce都会把自己当作这个TX的coordinator(Thread1委任8011,Thread2委任8021),这样就会出现如下的异常,

javax.transaction.TransactionRolledbackException: Current server is the coordinator and transaction is not found. It was probably rolled back and forgotten already.
at weblogic.rjvm.BasicOutboundRequest.sendReceive(BasicOutboundRequest.java:108)
at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:290)
at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:247)
at weblogic.jdbc.common.internal.RmiDataSource_814_WLStub.getConnection(Unknown Source)
at weblogic.transaction.internal.DriverTest1.getConnection(DriverTest1.java:39)
at weblogic.transaction.internal.DriverTest1.access$0(DriverTest1.java:34)
at weblogic.transaction.internal.DriverTest1$SQLThread.run(DriverTest1.java:135)

2:某个全局事务中启动的线程,不能同时操作同一个XAResource,比如Thread1操作datasource1和datasource2,thread2操作datasource2和datasource3。Weblogic中,我们做XA操作的时候,需要同后端的XA Resource Manager交互,交互中我们会多次调用xaStart(xid, flag),xaEnd(xid, flag)这里的flag可以使NOFLAGS、TMSUCESS、TMRESUME、TMSUSPEND等。如果我们在同一个全局事务的多个线程中同时操作某个RESOURCE,那么就可能我们不同线程先后给这个RESOUCE的RM发送相同的FLAG,比如xaStart(xid, TMSUSPEND),即两个线程同时发送TMSUSPEND,这样会引发XA_ERR,如下:

java.sql.SQLException: Unexpected exception while enlisting XAConnection java.sql.SQLException: XA error: XAER_RMERR : A resource manager error has occured in the transaction branch start() failed on resource 'TestXAPool_1': XAER_RMERR : A resource manager error has occured in the transaction branch
oracle.jdbc.xa.OracleXAException
at oracle.jdbc.xa.OracleXAResource.checkError(OracleXAResource.java:1017)
at oracle.jdbc.xa.client.OracleXAResource.start(OracleXAResource.java:227)
at weblogic.jdbc.wrapper.VendorXAResource.start(VendorXAResource.java:50)
at weblogic.jdbc.jta.DataSource.start(DataSource.java:629)
at weblogic.transaction.internal.XAServerResourceInfo.start(XAServerResourceInfo.java:1142)
at weblogic.transaction.internal.XAServerResourceInfo.xaStart(XAServerResourceInfo.java:1073)
at weblogic.transaction.internal.XAServerResourceInfo.enlist(XAServerResourceInfo.java:241)
at weblogic.transaction.internal.ServerTransactionImpl.enlistResource(ServerTransactionImpl.java:463)
at weblogic.jdbc.jta.DataSource.enlist(DataSource.java:1392)
at weblogic.jdbc.jta.DataSource.refreshXAConnAndEnlist(DataSource.java:1334)
at weblogic.jdbc.jta.DataSource.getConnection(DataSource.java:396)
at weblogic.jdbc.jta.DataSource.connect(DataSource.java:354)
at weblogic.jdbc.common.internal.RmiDataSource.getConnection(RmiDataSource.java:305)
at weblogic.jdbc.common.internal.RmiDataSource_WLSkel.invoke(Unknown Source)
......

虽然测试中没有什么问题,但我不建议谁这么去做,毕竟我们需要遵循规范。写这么个例子,只是让大家对weblogic的transaction加深些理解,而不是真的要在生产系统中这样去做。
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
平安e家保是什么险 2023年高级会计师报名时间和条件 2022高级会计师报考条件要求是什么? 问几个英文语法问题,希望能正确回答~ 女生锁骨下长痣,左右都有 不是很对称 寓意什么呐? 抖音上传照片音乐短怎么办 断夜奶好方法 婴儿断夜奶最佳方法 建设银行,你打算坑害多少优质客户? 【PS教程】如何做出凹陷字体效果的氛围海报 即食米粉生产线整套有多少台设备呢? 求 叶阳岚的《重生之公主千岁》 急用 谢了 宿迁海欣星城小区周边配套怎么样? 吸出来的母乳给宝宝吃要加热吗 地铁里挤是一种什么样的体验? 幽灵行动荒野黄金版和豪华版有什么区别 买哪个 招商信用卡怎么领取积分? 坐地铁很拥挤是啥体验? 招商信用卡如何获取积分? 榕树叶浆粘车面柒怎么办 榕树汁沾手背上感染毒以后怎么治疗 大榕树叶子表面的蜡质层去除的物理方法是什么 风筝的孩子哪里人 中国最高军官穿什么颜色军服 为什么榴莲一直那么贵? 绿箭侠的女朋友都有谁 第三季绿箭侠与it女在一起了吗 句句触动人心的经典句子,不闻不问不一定是忘了,但一定是疏远了_百度... 蓝紫色翡翠好不好翡翠蓝紫怎么样 求silent siren 恋い雪平假名片假名歌词 一文弄懂自身产品适合的推广方式,拉升对应流量放大店铺体量_百度... 培训机构700万体量多少 2003年3月11日有什么天气异象? 人工种植冬虫夏草与天然的区别,人工培育冬虫夏草和天然的区别 媚上欺下的是情商高吗 欺负老实人巴结有权用哪个成语好? 哈弗h6后座车窗为什么在行程中自动锁死?后座不能升降玻璃怎么解决_百度... 刮大板之后过柱子用什么展开剂 仿写作文星空让我回味 山东平阴电动车规范停放丢车包赔 发工资跟交社保的不是一家公司违法吗? ...母公司员工母公司发放工资,社保是由子公司购买的,这种情况合法吗... 两个公司法人一样,再一个公司交社保,另外一个发工资怎么开证明_百度知 ... 公司把我的工资社保交在另一个公司违法吗 老板有两家公司,比如员工薪资一万五,一家公司缴纳最低社保,一家公司用... 一个办公建筑单体,建筑面积约5万平方米,若做绿色三星,每年可以减少多少... 宝宝在幼儿园多次被一个小男孩抓脸,该怎么办? 陌生人摸脸抱宝宝 妈妈该如何拒绝? 谁可以坚定一下这个和田墨玉的金镶玉手串值多少钱呢? 明日之后双人床如何制作