飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

dubbo接口方法重载且入参未显式指定序列化id导致ClassCastException分析

时间:2022-01-17  作者:zhangyjblogs  

问题描述&模拟

线上登录接口,通过监控查看,有类型转换异常,具体报错如下图

image-20220108220128374

此报错信息是dubbo consumer端显示,且登录大部分是正常,有少量部分会报类型转换异常,同事通过更换方法名+显示指定序列化id解决此问题,但是产生这个问题的真正原因是什么呢?没有指定序列化id吗?还是dubbo方法重载问题?为什么服务端不显示此错误信息呢?,下面根据错误模拟下情况。

线上运行情况说明,报错的这台客户端部署在容器内,jdk版本

image-20220108220823552

服务方是混跑,有虚拟机和容器,容器的jdk版本相同,虚拟机jdk版本

image-20220108220912474

一开始认为是由于没有显示指定序列化id导致容器调用虚拟机的服务,由于jvm版本不一致导致的解码问题,但是分析和试验后,发现并非如此,模拟情况如下:

定义一个dubbo服务,方法重载且入参不显示指定序列化id,代码如下

//定义dubbo服务
public interface ProductService {
	Result<ProductVO> findProduct(String data);
	Result<ProductVO> findProduct(ProductDTO product);
}

//入参
@Data
public class ProductDTO  implements Serializable {
    //不显示指定序列化id
	private Integer productId;
	private String sn;
	private String code;
}

//出参
@Data
public class ProductVO implements Serializable{
	private static final long serialVersionUID = 4529782262922750326L;
	private Integer productId;
	private String productName;
}

dubbo客户端调用域名Product(ProductDTO product),并使用域名2版本,服务方使用域名版本,经过试验(jmeter压测),发现并未出现类型转换异常,现在通过代码分析来排除。

分析&dubbo provider处理请求流程

采用逆序方法,使用arthas进行反编译dubbo生成的代理类,ProductService生成的代理类是Wrapper2,内容如下

public Object invokeMethod(Object object, String name, Class[] classArray, Object[] objectArray)
			throws InvocationTargetException {
		ProductService productService;
		try {
			productService = (ProductService) object;
		} catch (Throwable throwable) {
			throw new IllegalArgumentException(throwable);
		}
		try {
			if ("findProduct".equals(name) && 域名th == 1
					&& classArray[0].getName().equals("域名ng")) {
				return 域名Product((String) objectArray[0]);
			}
			if ("findProduct".equals(name) && 域名th == 1
					&& classArray[0].getName().equals("域名.ProductDTO")) {
				return 域名Product((ProductDTO) objectArray[0]);
			}
		} catch (Throwable throwable) {
			throw new InvocationTargetException(throwable);
		}
		throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(name)
				.append("\" in class 域名.ProductService.").toString());
	}
}

通过查看反编译后的代码,得知dubbo方法重载,会根据方法类型和参数个数找到对应的目标方法执行。对于我这个线上问题,参数是ProductDTO,如果调用的是findProduct(String data),说明classArray[0]即参数类型是String类型,那么参数类型是如何得来的呢?根据自己之前写的dubbo流程分析,查看源码,在域名域名域名ractProxyInvoker#invoke(Invocation invocation),代码内容如下

image-20220108223056936

方法名称+方法类型+方面参数都封装在Invocation内,接着查找Invocation的来源,在DubboProtocol的匿名内部类DubboProtocol$1内发现,具体是reply(ExchangeChannel channel, Object message)方法内,参数message就是Invocation。

image-20220108225859703

接着看哪里调用DubboProtocol$域名y(ExchangeChannel channel, Object message)方法,在域名域名域名域名erExchangeHandler#handleRequest(ExchangeChannel channel, Request req)方法内,域名域名域名ata()获取此Invocation,即DecodeableRpcInvocation,那么接着看Request 以及域名a的来源;

接着向上找,在域名域名域名域名erExchangeHandler#received(Channel channel, Object message)的入参message就是Request ;

继续向上找,域名域名域名deHandler#received(Channel channel, Object message)的入参就是Request ,其中会对域名a即Invocation进行解码(默认在IO线程已经解码过,这里实际并不会再执行解码DecodeableRpcInvocation#hasDecoded=true)。

image-20220108230753355

继续向上找,域名域名域名nelEventRunnable#run()线程,message属性就是Request,那么接着只能找ChannelEventRunnable是如何创建并提交的

image-20220108230921910

继续向上找,在域名域名域名.AllChannelHandler#received(Channel channel, Object message)方法内创建ChannelEventRunnable并提交到线程池执行。

继续向上找,在域名域名域名域名ived(Channel channel, Object message),入参message就是Request

继续向上找,域名域名域名ived(Channel channel, Object message)

image-20220109003230986

继续向上找,域名域名域名ived(Channel ch, Object msg)

继续向上找,域名域名域名域名nelRead(ChannelHandlerContext ctx, Object msg),看到这个就说明是netty的work线程,NettyServerHandler是个inbound & outbound事件

dubbo service netty启动添加的inbound&outbound即pipeline chain[HeadContext InternalDecoder InternalEncoder NettyServerHandler TailContext],说明前面肯定有执行InternalDecoder 的channelRead事件。此时入参message就是Request。

下面着重分析InternalDecoder 的channelRead事件,执行堆栈依次为:

InternalDecoder(域名域名ToMessageDecoder).channelRead(ChannelHandlerContext ctx, Object msg)
InternalDecoder(域名域名ToMessageDecoder).callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
域名de(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
域名de(Channel channel, ChannelBuffer buffer)
DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
域名deBody(Channel channel, InputStream is, byte[] header)
域名de()
域名de(Channel channel, InputStream input)

InternalDecoder是netty pipeline的inboud事件,执行的是channelRead,具体逻辑在域名de(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)内,代码如下

image-20220109010146098

接着触发下一个inbound的channelRead动作,传入的就是Request了,代码说明如下

image-20220109010534969

接着看域名de(Channel channel, ChannelBuffer buffer),这里进行解码

//域名域名域名oCountCodec#decode(Channel channel, ChannelBuffer buffer)
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = 域名erIndex();//获取读位置
    MultiMessage result = 域名te();//MultiMessage是Request的集合
    do {
        Object obj = 域名de(channel, buffer);//使用DubboCodec进行解码,下面根据解码结果进行不同处理
        if (域名_MORE_INPUT == obj) {//说明发生了tcp粘包,退出循环
            域名erIndex(save);
            break;
        } else {
            域名essage(obj);//把obj即Request添加到集合MultiMessage
            logMessageLength(obj, 域名erIndex() - save);
            save = 域名erIndex();//设置新的buffer读位置,继续使用DubboCodec进行解码
        }
    } while (true);
    if (域名pty()) {
        return 域名_MORE_INPUT;
    }
    if (域名() == 1) {//如果MultiMessage只有一个元素,则说明本次没有发生粘包
        return 域名(0);//返回Request
    }
    return result;//返回MultiMessage,在后续的MultiMessagehandler内获取Request的集合遍历处理
}

接着看DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)解码过程,如何对dubbo协议解码的,先看下dubbo协议的报文结构

接着看代码,对着报文结构进行解码

//DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = 域名ableBytes();
    byte[] header = new byte[域名(readable, HEADER_LENGTH)];
    域名Bytes(header);//把缓冲区字节存放到header
    return decode(channel, buffer, readable, header);
}

//DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
        || readable > 1 && header[1] != MAGIC_LOW) {//非魔数,说明非dubbo报文的开头,说明发生了tcp拆包/粘包
        int length = 域名th;
        if (域名th < readable) {
            header = 域名Of(header, readable);
            域名Bytes(header, length, readable - length);
        }
        for (int i = 1; i < 域名th - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                域名erIndex(域名erIndex() - 域名th + i);
                header = 域名Of(header, i);
                break;
            }
        }
        return 域名de(channel, buffer, readable, header);
    }
    // check length.
    if (readable < HEADER_LENGTH) {//为什么是小于16呢?因为dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)+bodyLenght(4)就是16字节了,小于16字节,肯定发生了拆包,本次接收到的数据并没有body
        return 域名_MORE_INPUT;
    }

    // get data length.
    int len = 域名s2int(header, 12);//12的原因是dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)等于12,从12位后取4位,转换为int,就是body的长度
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    if (readable < tt) {//可读取数少于bodylen+16,说明tcp拆包,需要继续进网络读取
        return 域名_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        return decodeBody(channel, is, header);//解码body内容
    } finally {
        if (域名lable() > 0) {
            try {
                if (域名rnEnabled()) {
                    域名("Skip input stream " + 域名lable());
                }
                域名UnusedStream(is);
            } catch (IOException e) {
                域名(域名essage(), e);
            }
        }
    }
}

接着看解码dubbo body,在域名域名域名oCodec#decodeBody

//域名域名域名oCodec#decodeBody(Channel channel, InputStream is, byte[] header)
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = 域名s2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {//是响应,编码
        //省略
    } else {//请求,解码
        // decode request.
        Request req = new Request(id);
        域名ersion(域名rotocolVersion());
        域名woWay((flag & FLAG_TWOWAY) != 0);
        if ((flag & FLAG_EVENT) != 0) {
            域名vent(域名TBEAT_EVENT);
        }
        try {
            Object data;
            if (域名artbeat()) {//心跳
                data = decodeHeartbeatData(channel, 域名rialize(域名rl(), is, proto));
            } else if (域名ent()) {//事件
                data = decodeEventData(channel, 域名rialize(域名rl(), is, proto));
            } else {
                DecodeableRpcInvocation inv;
                if (域名rl().getParameter(
                    域名DE_IN_IO_THREAD_KEY,
                    域名ULT_DECODE_IN_IO_THREAD)) {//默认是在netty work线程进行解码
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    域名de();//解码dubbo body,解码结果保存在DecodeableRpcInvocation
                } else {
                    inv = new DecodeableRpcInvocation(channel, req,
                                                      new UnsafeByteArrayInputStream(readMessageData(is)), proto);//否则在业务线程ChannelEventRunnable进行解码
                }
                data = inv;
            }
            域名ata(data);//把Invocation保存到域名a
        } catch (Throwable t) {
            if (域名rnEnabled()) {
                域名("Decode request failed: " + 域名essage(), t);
            }
            // bad request
            域名roken(true);
            域名ata(t);
        }
        return req;
    }
}

接着看DecodeableRpcInvocation解码dubbo body

//域名域名域名deableRpcInvocation#decode()
@Override
public void decode() throws Exception {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);//解码
        } catch (Throwable e) {
            if (域名rnEnabled()) {
                域名("Decode rpc invocation failed: " + 域名essage(), e);
            }
            域名roken(true);
            域名ata(e);
        } finally {
            hasDecoded = true;//解码后置位已经解码,这样在ChannelEventRunnable线程内就不会再进行解码
        }
    }
}

//域名域名域名deableRpcInvocation#decode(域名域名nel, 域名tStream)
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = 域名erialization(域名rl(), serializationType)
        .deserialize(域名rl(), input);//根据序列化标识获取反序列对象,dubbo spi的自适应

    String dubboVersion = 域名UTF();//从输入流读取dubbo version
    域名ersion(dubboVersion);
    setAttachment(域名O_VERSION_KEY, dubboVersion);

    setAttachment(域名_KEY, 域名UTF());//从输入流读path
    setAttachment(域名ION_KEY, 域名UTF());//从输入流读版本

    setMethodName(域名UTF());//从输入流读 调用的目标方法名
    try {
        Object[] args;
        Class<?>[] pts;
        String desc = 域名UTF();//从输入流读 参数描述符,即参数的类型 比如[Ljava/lang/String
        if (域名th() == 0) {//dubbo调用方法不存在入参
            pts = 域名Y_CLASS_ARRAY;
            args = 域名Y_OBJECT_ARRAY;
        } else {//dubbo调用方法存在入参
            pts = 域名2classArray(desc);//类型描述符转换为类型,比如[Ljava/lang/String => 域名ng
            args = new Object[域名th];//参数长度
            for (int i = 0; i < 域名th; i++) {
                try {
                    args[i] = 域名Object(pts[i]);//从输入流读取参数,这里是readObject,执行反序列化
                } catch (Exception e) {
                    if (域名rnEnabled()) {
                        域名("Decode argument failed: " + 域名essage(), e);
                    }
                }
            }
        }
        setParameterTypes(pts);//把参数类型保存到Invocation对象,即parameterTypes属性上

        Map<String, String> map = (Map<String, String>) 域名Object(域名s);//从输入流读取隐式参数并解码
        if (map != null && 域名() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            域名ll(map);
            setAttachments(attachment);
        }
        //decode argument ,may be callback
        for (int i = 0; i < 域名th; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }

        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(域名ring("Read invocation data failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}

从解码dubbo body看出,从输入流解码获取调用的目标方法名称、方法类型、方法入参、隐式参数都保存到Invocation对象(即DecodeableRpcInvocation),其中读取入参和隐式参数使用到了序列化解码(需要使用到序列化id),而从输入流获取方法名称+参数类型并没有使用对象的反序列化。

dubbo provider处理接收总结

dubbo prodiver端从网络到dubbo业务线程池调用以及如何解码流程分析完,现在总结下:

image-20220109225136447

dubbo provider接收并处理consumer请求分两步

1.网络通信,在io线程上解码,解码结果保存到Request。

域名线程调起dubbo业务线程,传入解码结果Request,通过Invoker调用目标方法,传入要执行目标方法的对象、方法名、参数类型、参数进行调用目标方法。

该问题分析

解决2个问题

问题1:为什么在服务端报错ClassCastException,在服务端没有任何error日志呢?只有在客户端才有error日志

由于在dubbo代理类Wrapper2调用目标方法导致ClassCastException,异常被捕捉封装为InvocationTargetException向上抛,接着在域名域名域名ractProxyInvoker#invoke内异常被捕捉,封装为RpcResult,继而在ExceptionFilter内异常信息被封装为RuntimeException返回客户端。这中间并没有日志打印,因此不产生error日志,所以服务端看不到。

问题2:dubbo方法重载会导致问题吗?

结论,基本不会,dubbo的动态代理类WrapperX会根据Invocation的methodName+参数类型+参数进行调用目标方法,因此不会。网上有个大佬说dubbo方法重载在某种情况会导致问题,但是他写的语句有些不通顺且凌乱,而且蓝绿是流量隔离的,不会调错,我认为他的举例不合适,感兴趣的可以参考dubbo同名方法的问题及思考。

问题3:是否是未显式指定序列化id导致的呢?

经过前面分析,是由于判断参数类型是String(本来应该是DTO类型),导致执行目标方法时候把参数转换为String导致的异常,参数类型来源于Invocation对象(即域名meterTypes),而Invocation来源于域名a,而Request是网络通信解码得来,其中在域名域名域名deableRpcInvocation#decode(域名域名nel, 域名tStream)String desc = 域名UTF();从输入流读取字节流并解码为参数类型描述符,这个地方并不涉及到对象的序列化和反序列化。

看客户端编码代码InternalEncoder,编码参数类型代码如下图

image-20220111002311123

而客户端发送建立Request是在域名域名域名域名erExchangeChannel#request(域名ct, int),而Invocation对象是在dubbo调用的入口InvokerInvocationHandler内(new RpcInvocation(method, args)封装方法名+参数创建Invocation对象,继而参数类型就保存在了Invocation对象。

这样分析得来,不显示指定序列化id并不会导致这个问题。

排除了jdk版本、不显示指定序列化ID等原因,具体是什么原因导致的dubbo方法重载导致调用ClassCastException呢?线上预发环境和生产网络是互通,是否是预发环境同事手工部署的应用只有入参String的方法呢(未和生产同步版本)?同事也记不清了,也无法查,这个问题暂时是无法知道答案了。

据我猜测,问题可能出现是预发环境部署的服务没有和生产版本同步(缺少findProduct(ProductDTOdata)导致),我们预发和生成网络是互通的,应该是生产客户端调用到了预发环境服务,而预发环境部署的此服务没有findProduct(ProductDTOdata)。

为什么需要显示指定序列化id

rpc调用使用的tcp通信,需要把对象转换为二进制流进行发送(编码)和接收(解码),那么就需要有套规则需要把内存中的java对象转换为二进制流,序列化就是做这个事情的。

在使用原生序列化的时候,serialVersionUID起到了一个类似版本号的作用,在反序列化的时候判断serialVersionUID如果不相同,会抛出InvalidClassException。

如果在使用原生序列化方式的时候官方是强烈建议指定一个serialVersionUID的,如果没有指定,在序列化过程中,jvm会自动计算出一个值作为serialVersionUID,由于这种运行时计算serialVersionUID的方式依赖于jvm的实现方式,如果序列化和反序列化的jvm实现方式不一样可能会导致抛出异常InvalidClassException,所以强烈建议指定serialVersionUID。

不显示指定序列化ID实际会导致问题吗?

定义一个dubbo的入参,不显示指定序列化id,客户端运行不变更,服务端入参进行增加或删除字段(类结构发生变化),发现均能正常请求,并非像网上所说的不显示指定序列化id情况下rpc参数类结构变化,并没有导致什么问题,当然我只是在jdk8版本下进行了此测试(当然现在都是jdk8),这样情况下,实际使用过程中,不显示指定序列化id好像也不会影响什么呢。

网上有说法,不显示指定序列化id会导致一种情况出现问题:举个例子:比如该入参没有显示指定序列化id,后面有个需求需要在这个入参增加个字段,而且看没有显示指定序列化id,顺手就增加了个序列化id,这样线上运行的客户端应用由于引用的还是旧jar,新的服务部署上去,就会发送序列化失败(客户端jvm生成的序列化id和服务端显示指定的序列化id不同),好像这种情况是无法避免的。但是我经过测试,不显示指定序列化id情况下 对dubbo参数进行增加字段、删除字段、增加方法等都不会造成反序列化问题(jdk8, dubbo2.6.8下测试),请求均正常。验证结果说明jvm生成序列化id和类的结构没有关系。可以参考别人测试结果,和我测试结果相同。

那么是否就可以大胆的不指定序列化id呢?还是建议不要,鬼知道jvm生成序列化id的实现方式呢,不指定万一线上哪天出现幺蛾子。

验证了半天,得到一个不指定序列化id也没关系的实际验证结论,但是又不敢完全放心大胆不显示指定序列化id,抓狂。。。

最终结论

根据实际验证(jdk8, dubbo2.6.8下测试),不显示指定序列化id时,dubbo的传输对象在增加字段、删除字段、增加方法等都不会造成反序列化问题,但是还是强烈建议显示指定序列化id,万一jvm生成序列化id不兼容了呢

结尾

分析了这么长,最终也没找到这个问题的产生原因,但是对dubbo的通信层又加深了理解,下面一篇记录下总结的dubbo通信层

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。