Dubbo 压测插件的实现——基于 Gatling
Dubbo 压测插件已开源,本文涉及代码详见gatling-dubbo
Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益。基于 Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。
插件主要结构
实现 Dubbo 压测插件,需实现以下四部分内容:
- Protocol 和 ProtocolBuild
协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类 - Action 和 ActionBuild
执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类 - Check 和 CheckBuild
检查部分,全链路压测中我们都使用Json Path
检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类 - DSL
Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致
Protocol
协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:
- protocol
协议,设置为dubbo
- generic
泛化调用设置,Dubbo 压测插件使用泛化调用发起请求,所以这里设置为true
,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值result_no_change
(去掉优化前泛化调用的序列化开销以提升性能) - url
Dubbo 服务的地址:dubbo://IP地址:端口
- registryProtocol
Dubbo 注册中心的协议,设置为ETCD3
- registryAddress
Dubbo 注册中心的地址
如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。
object DubboProtocol {
val DubboProtocolKey = new ProtocolKey {
type Protocol = DubboProtocol
type Components = DubboComponents
def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]
def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can't provide a default value for DubboProtocol")
def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = {
dubboProtocol => DubboComponents(dubboProtocol)
}
}
}
case class DubboProtocol(
protocol: String, //dubbo
generic: String, //泛化调用?
url: String, //use url or
registryProtocol: String, //use registry
registryAddress: String //use registry
) extends Protocol {
type Components = DubboComponents
}
为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:
case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents {
def onStart: Option[Session => Session] = None
def onExit: Option[Session => Unit] = None
}
以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。
object DubboProtocolBuilderBase {
def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol)
}
case class DubboProtocolBuilderGenericStep(protocol: String) {
def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic)
}
case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) {
def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)
}
case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) {
def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)
}
case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) {
def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)
}
case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) {
def build = DubboProtocol(
protocol = protocol,
generic = generic,
url = url,
registryProtocol = registryProtocol,
registryAddress = registryAddress
)
}
Action
DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。
DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似 ${args_types}
、${args_values}
这样的表达式从数据 Feeder 中解析对应字段的值。
execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。
异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。
为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近,请参考 **dubbo 泛化调用性能优化**。
class DubboAction(
interface: String,
method: String,
argTypes: Expression[Array[String]],
argValues: Expression[Array[Object]],
genericService: GenericService,
checks: List[DubboCheck],
coreComponents: CoreComponents,
throttled: Boolean,
val objectMapper: ObjectMapper,
val next: Action
) extends ExitableAction with NameGen {
override def statsEngine: StatsEngine = coreComponents.statsEngine
override def name: String = genName("dubboRequest")
override def execute(session: Session): Unit = recover(session) {
argTypes(session) flatMap { argTypesArray =>
argValues(session) map { argValuesArray =>
val startTime = System.currentTimeMillis()
val f = Future {
try {
genericService.$invoke(method, argTypes(session).get, argValues(session).get)
} finally {
}
}
f.onComplete {
case Success(result) =>
val endTime = System.currentTimeMillis()
val resultMap = result.asInstanceOf[JMap[String, Any]]
val resultJson = objectMapper.writeValueAsString(resultMap)
val (newSession, error) = Check.check(resultJson, session, checks)
error match {
case None =>
statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None)
throttle(newSession(session))
case Some(Failure(errorMessage)) =>
statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))
throttle(newSession(session).markAsFailed)
}
case FuFailure(e) =>
val endTime = System.currentTimeMillis()
statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))
throttle(session.markAsFailed)
}
}
}
}
private def throttle(s: Session): Unit = {
if (throttled) {
coreComponents.throttler.throttle(s.scenario, () => next ! s)
} else {
next ! s
}
}
}
DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:
case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder {
private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents =
protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)
override def build(ctx: ScenarioContext, next: Action): Action = {
import ctx._
val protocol = components(protocolComponentsRegistry).dubboProtocol
//Dubbo客户端配置
val reference = new ReferenceConfig[GenericService]
val application = new ApplicationConfig
application.setName("gatling-dubbo")
reference.setApplication(application)
reference.setProtocol(protocol.protocol)
reference.setGeneric(protocol.generic)
if (protocol.url == "") {
val registry = new RegistryConfig
registry.setProtocol(protocol.registryProtocol)
registry.setAddress(protocol.registryAddress)
reference.setRegistry(registry)
} else {
reference.setUrl(protocol.url)
}
reference.setInterface(interface)
val cache = ReferenceConfigCache.getCache
val genericService = cache.get(reference)
val objectMapper: ObjectMapper = new ObjectMapper()
new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next)
}
}
LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL
case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport {
def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes)
def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues)
def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList)
def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks)
}
Check
全链路压测中,我们都使用Json Path
校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于Json Path
的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:
package object dubbo {
type DubboCheck = Check[String]
val DubboStringExtender: Extender[DubboCheck, String] =
(check: DubboCheck) => check
val DubboStringPreparer: Preparer[String, String] =
(result: String) => Success(result)
}
基于Json Path
的校验逻辑:
trait DubboJsonPathOfType {
self: DubboJsonPathCheckBuilder[String] =>
def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers)
}
object DubboJsonPathCheckBuilder {
val CharsParsingThreshold = 200 * 1000
def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =
response => {
if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)
jsonParsers.safeParseJackson(response)
else
jsonParsers.safeParseBoon(response)
}
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType
}
class DubboJsonPathCheckBuilder[X: JsonFilter](
private[check] val path: Expression[String],
private[check] val jsonParsers: JsonParsers
)(implicit extractorFactory: JsonPathExtractorFactory)
extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](
DubboStringExtender,
DubboJsonPathCheckBuilder.preparer(jsonParsers)
) {
import extractorFactory._
def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))
def findAllExtractor = path.map(newMultipleExtractor[X])
def countExtractor = path.map(newCountExtractor)
}
DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL
trait DubboCheckSupport {
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
DubboJsonPathCheckBuilder.jsonPath(path)
}
- Dubbo 压测脚本中可以设置一个或多个 check 校验请求结果,使用 DSL check 方法 *
DSL
trait AwsDsl
提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。
此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法
trait DubboDsl extends DubboCheckSupport {
val Dubbo = DubboProtocolBuilderBase
def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method)
implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build
implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build()
def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = {
session.set(argTypeName, toArray(session(argTypeName).as[JList[String]]))
.set(argValueName, toArray(session(argValueName).as[JList[Any]]))
}
private def toArray[T:ClassTag](value: JList[T]): Array[T] = {
value.asScala.toArray
}
}
object Predef extends DubboDsl
Dubbo 压测脚本和数据 Feeder 示例
压测脚本示例:
import io.gatling.core.Predef._
import io.gatling.dubbo.Predef._
import scala.concurrent.duration._
class DubboTest extends Simulation {
val dubboConfig = Dubbo
.protocol("dubbo")
.generic("true")
//直连某台Dubbo机器,只单独压测一台机器的水位
.url("dubbo://IP地址:端口")
//或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心
.registryProtocol("")
.registryAddress("")
val jsonFileFeeder = jsonFile("data.json").circular //数据Feeder
val dubboScenario = scenario("load test dubbo")
.forever("repeated") {
feed(jsonFileFeeder)
.exec(session => transformJsonDubboData("args_types1", "args_values1", session))
.exec(dubbo("com.xxx.xxxService", "methodName")
.argTypes("${args_types1}")
.argValues("${args_values1}")
.check(jsonPath("$.code").is("200"))
)
}
setUp(
dubboScenario.inject(atOnceUsers(10))
.throttle(
reachRps(10) in (1 seconds),
holdFor(30 seconds))
).protocols(dubboConfig)
}
data.json 示例:
[
{
"args_types1": ["com.xxx.xxxDTO"],
"args_values1": [{
"field1": "111",
"field2": "222",
"field3": "333"
}]
}
]
Dubbo 压测报告示例
我的系列博客
混沌工程 - 软件系统高可用、弹性化的必由之路
异步系统的两种测试方法
我的其他测试相关开源项目
捉虫记:方便产品、开发、测试三方协同自测的管理工具
招聘
有赞测试组在持续招人中,大量岗位空缺,只要你来,就能帮你点亮全栈开发技能树,有意向换工作的同学可以发简历到 sunjun【@】youzan.com