博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
akka-grpc - 应用案例
阅读量:1495 次
发布时间:2019-04-20

本文共 9757 字,大约阅读时间需要 32 分钟。

  上期说道:http/2还属于一种不算普及的技术协议,可能目前只适合用于内部系统集成,现在开始大面积介入可能为时尚早。不过有些项目需求不等人,需要使用这项技术,所以研究了一下akka-grpc,写了一篇介绍。本想到此为止,继续其它项目。想想这样做法有点不负责任,像是草草收场。毕竟用akka-grpc做了些事情,想想还是再写这篇跟大家分享使用kka-grpc的过程。

我说过,了解akka-grpc的主要目的还是在protobuf的应用上。这是一种高效率的序列化协议。刚好,公司有这么个项目,是一个图像处理平台:把很多图片拍摄终端的图像传上平台进行商品识别、OCR等图像处理。由于终端数量多、图像处理又特别消耗内存、CPU等计算资源、又要求快速响应,所以第一考虑就是使用akka-cluster把图像处理任务分割到多个节点上并行处理。这里就需要仔细考虑图片在终端到平台、然后集群节点与点actor间的传输效率了。如何在akka系统里使用protobuf格式的数据正是本篇讨论和示范的目的。

akka-grpc应用一般从IDL文件里消息类型和服务函数的定义开始,如下面这个.proto文件示范:

syntax = "proto3";import "google/protobuf/wrappers.proto";import "google/protobuf/any.proto";import "scalapb/scalapb.proto";option (scalapb.options) = {  // don't append file name to package  flat_package: true  // generate one Scala file for all messages (services still get their own file)  single_file: true  // add imports to generated file  // useful when extending traits or using custom types  // import: "io.ontherocks.hellogrpc.RockingMessage"  // code to put at the top of generated file  // works only with `single_file: true`  //preamble: "sealed trait SomeSealedTrait"};package com.datatech.pos.abs;message UCredential {  string userid = 1;  string password = 2;}message JWToken {  string jwt = 1;}message Picture {  int32 num = 1;  bytes blob = 2;}message Capture {  string ean = 1;  bytes cover1 = 2;  bytes cover2 = 3;}message Book {  string ean = 1;  string ver = 2;  string isbn = 3;  string title = 4;  string publisher = 5;  double price = 6;  bytes cover1 = 7;  bytes cover2 = 8;}message QueryResult {  int32  sts         = 1;  string msg         = 2;  Book bookinfo   = 3;}service Services {  rpc GetAuthToken(UCredential) returns (JWToken) {};  rpc SavePicture(Picture) returns (QueryResult) {};  rpc GetPicture(Picture) returns (Picture) {};//  rpc SaveCapture(Capture) returns (QueryResult) {};//  rpc GetCapture(Capture) returns (Capture) {};//  rpc GetBookInfo(Capture) returns (QueryResult) {};}

因为这次示范针对的是protobuf的使用,所以就拣了SavePicture,GetPicture这两项服务函数。JWToken只是用户身份凭证,集群分片shard-entityId是以用户凭证为基础的,所以平台需要通过JWT进行跨节点任务指派以实现分布式图像处理运算。

下面就要在编译器插件自动产生的基础服务接口代码基础上进行具体的服务功能实现。这部分主要是对接口函数的实现(oveerride):

class gRPCServices(trace: Boolean, system: ActorSystem, sharding: ClusterSharding)(  implicit  waitResponseTimeout: Timeout, authenticator: AuthBase) extends ServicesPowerApi with LogSupport {  implicit val ec = system.dispatcher  log.stepOn = trace  override def getAuthToken(request: UCredential, meta: Metadata): Future[JWToken] = {    val entityRef = sharding.entityRefFor(Authenticator.EntityKey, UUID.randomUUID.toString)    val jwtResp = for {      ui <- entityRef.ask[Authenticator.Response](Authenticator.GetUserInfo(request.userid, _))        .map {          case Authenticator.UserInfo(info) => info          case _ => Map[String, Any]()        }      jwt <- entityRef.ask[Authenticator.Response](Authenticator.GetToken(ui, _))    } yield jwt    jwtResp.map {      case Authenticator.JWToken(jwt) =>        if (jwt.nonEmpty) JWToken(jwt)        else throw new Exception("身份验证失败!无法提供凭证。")      case _ => throw new Exception("身份验证失败!无法提供凭证。")    }  }  override def savePicture(in: Picture, metadata: Metadata): Future[QueryResult] = {    val jwt = getJwt(metadata).getOrElse("")    val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))    val (shopId, posId, termId, impurl,devId) = ids    val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")    val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.SaveImage(in, _))      .map {        case ImgProcessor.ValidImgPro(img) => QueryResult(sts = 0, msg = "picture saved.")        case ImgProcessor.FailedImgPro(msg) => QueryResult(sts = -1, msg = msg)      }    futResp  }  override def getPicture(in: Picture, metadata: Metadata): Future[Picture] = {    val jwt = getJwt(metadata).getOrElse("")    val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))    val (shopId, posId, termId, impurl,devId) = ids    val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")    val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.GetImage(in.num, _))      .map {        case ImgProcessor.ValidImgPro(img) => img        case ImgProcessor.FailedImgPro(msg) => Picture(-1, ByteString.EMPTY)      }    futResp  }  def getJwt(metadata: Metadata): Option[String] = {    metadata.getText("bearer")  }}

由于是通过PowerApi模式产生的接口代码,所以接口函数都带有MetaData参数,代表HttpRequest header集合。可以看到:服务函数实现都是通过entityRef,一个分片调度器分配到集群某个节点ImgProcessor.EntityKey类型的entity-actor上进行的。shopId:posId就是代表为某用户构建的entityId,这个是通过用户在Request中提供的MetaData参数中jwt解析得出的。

可以看到,具体服务提供是通过集群的分片实现的。下面是这个分片的代码示范:

log.step(s"initializing sharding for ${ImgProcessor.EntityKey} ...")(MachineId("",""))      val imgEntityType = Entity(ImgProcessor.EntityKey) { entityContext =>        ImgProcessor(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)      }.withStopMessage(ImgProcessor.StopWorker)      sharding.init(imgEntityType)

上面imgEntityType就是shard-entity类型,其实就是按用户提供的jwt在任意集群节点上实时构建的一个opencv图像处理器。下面是这个entity-actor的示范代码:

object ImgProcessor extends LogSupport {  sealed trait Command extends CborSerializable  case class SaveImage(img: Picture, replyTo: ActorRef[Response]) extends Command  case class GetImage(imgnum: Int,replyTo: ActorRef[Response]) extends Command  sealed trait Response extends CborSerializable  case class ValidImgPro(img: Picture) extends Response  case class FailedImgPro(msg: String) extends Response  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {    val (shopId,posId) = entityId.split(':').toList match {      case sid::pid::Nil  => (sid,pid) }    implicit val loc = Messages.MachineId(shopId,posId)    log.stepOn = trace    Behaviors.setup[Command] { ctx =>      implicit val ec = ctx.executionContext      ctx.setReceiveTimeout(keepAlive, Idle)      Behaviors.withTimers[Command] { timer =>        Behaviors.receiveMessage[Command] {          case SaveImage(img, replyTo) =>            log.step(s"ImgProcessor: SaveImage(${img.num})")            implicit val client = mongoClient(mgoHosts)            maybeMgoClient = Some(client)            ctx.pipeToSelf(savePicture(img)) {              case Success(_) => {                  replyTo ! ValidImgPro(img)                  Done(loc.shopid, loc.posid, s"saved image #${img.num}.")              }              case Failure(err) =>                log.error(s"ImgProcessor: SaveImage Error: ${err.getMessage}")                replyTo ! FailedImgPro(err.getMessage)                Done(loc.shopid, loc.posid, s"SaveImage with error: ${err.getMessage}")            }            Behaviors.same          case GetImage(imgnum, replyTo) =>...  }}

整个图片传输是通过actor的消息实现的。akka消息支持多种序列化格式,包括protobuf, 在配置文件.conf里定义:

akka {  loglevel = INFO  actor {    provider = cluster    serializers {      jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"      proto = "akka.remote.serialization.ProtobufSerializer"    }    serialization-bindings {      "com.datatech.pos.abs.CborSerializable" = jackson-cbor      "scalapb.GeneratedMessage" = proto    }  }}

grpc server 基本上是个标准模块,不同的只是service参数:

class gRPCServer(host: String, port: Int) extends LogSupport {  def runServer(system: ActorSystem[_], service: gRPCServices): Future[Http.ServerBinding] = {    implicit val classic = system.toClassic    implicit val ec: ExecutionContext = system.executionContext    // Create service handlers    val serviceHandler: HttpRequest => Future[HttpResponse] =      ServicesPowerApiHandler(service)    // Bind service handler servers to localhost:8080/8081    val binding = Http().bindAndHandleAsync(      serviceHandler,      interface = host,      port = port,      connectionContext = HttpConnectionContext())    // report successful binding    binding.foreach { binding => println(s"******* startup gRPC-server on: port = $port  *******") }    binding    //#server  }}

下面是客户端测试代码:

object gRPCTestClient {  def main(args: Array[String]): Unit = {    val config_onenode = ConfigFactory.load("onenode")    implicit val sys = ActorSystem("grpc-client", config_onenode)    implicit val ec = sys.dispatcher    val clientSettings = GrpcClientSettings.fromConfig(Services.name)    //   val clientSettings = GrpcClientSettings.connectToServiceAt("192.168.11.189", 50052);    implicit val client = ServicesClient(clientSettings)    val futJwt = client.getAuthToken(UCredential("9013", "123456"))    val jwt = Await.result(futJwt, 5.seconds).jwt    println(s"got jwt: ${jwt}")    scala.io.StdIn.readLine()    val bytes = FileStreaming.FileToByteArray("books/59c10d099b26e.jpg")    val mat = bytesToMat(bytes)    show(mat,"sent picture")    scala.io.StdIn.readLine()    val picture = Picture(111,marshal(bytes))    val futQR = client.savePicture().addHeader("Bearer", jwt).invoke(Picture(111,marshal(bytes)))    futQR.onComplete {      case Success(qr) => println(s"Saving Success: ${qr.msg}")      case Failure(err) => println(s"Saving Error: ${err.getMessage}")    }    scala.io.StdIn.readLine()    val futPic = client.getPicture().addHeader("Bearer", jwt).invoke(Picture(111,ByteString.EMPTY))    futPic.onComplete {      case Success(pic) =>        val image = bytesToMat(unmarshal(pic.blob))        show(image, s"picture:${pic.num}")      case Failure(err) => println(s"Reading Error: ${err.getMessage}")    }    scala.io.StdIn.readLine()    sys.terminate()  }}

基本流程是:先通过getAuthToken获取jwt;在调用服务时通过addHeader("bearer",jwt)把jwt随着函数调用一起提交给服务端。

客户端设置可以在配置文件中定义:

akka {  loglevel = INFO  grpc.client {    "com.datatech.pos.abs.Services" {      host = 192.168.11.189      port = 52051      override-authority = foo.test.google.fr      use-tls = false    }  }}

 

转载地址:http://phmgu.baihongyu.com/

你可能感兴趣的文章
英语小短文翻译1——The Significance of Books
查看>>
杂乱的Java笔记(未完)
查看>>
Java的类与类之间的USES-A和HAS-A及区分
查看>>
Java中类的构造方法
查看>>
java成员变量的使用
查看>>
java方法的重载
查看>>
在Windows命令行运行java程序
查看>>
java的可扩展性
查看>>
java中JDK的安装
查看>>
Java之数组小知识(加内存图)
查看>>
关于Java的一些图形awt介绍
查看>>
java之StringBuilder和关于数组怎么扩容
查看>>
java中的形参为引用类型的问题
查看>>
模拟太阳系项目
查看>>
c语言基础语法六——结构体(完结)
查看>>
我的大二上学期
查看>>
安卓——eclipse真机连接问题解决1
查看>>
安卓——按钮的四种点击事件
查看>>
c语言基础语法三——数组
查看>>
链表操作——多项式加减乘
查看>>