0%

Flink剖析系列之Flink底层RPC通信机制

前言

对于分布式系统,内部组件众多,而组件之间的联系就需要一套非常高效的通信机制,Flink底层的RPC框架是基于Akka实现。 本文着重通过一些例子来分析flink底层的通信机制

文章结构

文章结构如下:

  1. Akka介绍与简单例子
  2. Flink RPC实例以及Rpc通信底层源码分析
  3. 调用远程的RPC流程
  4. 总结

Akka介绍与简单例子

Akka 是什么

  1. 一个开发并发、容错、可伸缩应用的框架
  2. 构建在JVM至上,基于Actor模型
  3. 定义一组规则,规定一组系统中每个模块之间如何交互,如何回应。

Actor 解决什么问题

开发高效率的并发程序,充分利用CPU资源。解决传统多线程方法的维护困难和容易发生错误的问题。对并发模型有一个更好的抽象。异步非阻塞。

Akka模型组成原理

下面是一张来自官网的图片,形象的介绍了Actor的内部模型。
image

Actor是最小的单元模块,系统有n个Actor组成,每个Actor有由mailbox和自身状态组成。

Actor和Actor是通过信件进行通信,每个Actor是串行处理每一条消息的。并且信件是不可变的。

例子

Akka的创建和执行流程

  • 构建ActorSystem
  • 创建Actor
    • 不能直接New一个Actor,而是你用通过actorSystem的actorOf方法创建,并且返回的是Actor的引用ActorRef,通过引用操作Actor
  • 发送消息
  • 回应消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class AkkaTest  {
private static ActorSystem actorSystem = null;
private ActorRef helloActor = null;
private ActorRef hiActor = null;
@BeforeClass
public static void setup() {
//构建ActorSystem
actorSystem = AkkaUtils.createDefaultActorSystem();

}
@Before
public void init() {
//构建Actor,获取该Actor的引用,即ActorRef
helloActor = actorSystem.actorOf(Props.create(HellowActor.class), "helloActor");
hiActor = actorSystem.actorOf(Props.create(HiActor.class), "hiActor");
}
@AfterClass
public static void teardown() throws Exception {
//关闭系统
actorSystem.terminate();
}
@Test
public void testSay() {
//通过hiActor给helloActor发送消息
helloActor.tell("jack", hiActor);
}
/**
* @author 奔跑的蜗牛
*/
public static class HellowActor extends AbstractActor {

@Override
public Receive createReceive() {
//根据消息类型路由处理方法
return ReceiveBuilder.create()
.matchAny(this::handleMessage)
.build();
}

private void handleMessage(Object message) {
//处理方法
System.out.println("hello!" + message);
//给发送者回信
getSender().tell("mary", this.getSelf());
}
}

/**
* @author 奔跑的蜗牛
*/
public static class HiActor extends AbstractActor {

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchAny(this::handleMessage)
.build();
}

private void handleMessage(Object message) {
System.out.println("hi! " + message);
}
}
}

运行结果

1
2
hello!jack
hi! mary

话不多说,直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class RpcEndpointTest extends TestLogger {

private static final Time TIMEOUT = Time.seconds(10L);
private static ActorSystem actorSystem = null;
private static RpcService rpcService = null;

@BeforeClass
public static void setup() {
//在这里会创建爱你一个actorSystem
actorSystem = AkkaUtils.createDefaultActorSystem();
//实例化一个AkkaRpcService,核心方法有startServer,stopServer和根据地质连接到一个Actor,并且返回RpcGateway
rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
}

@AfterClass
public static void teardown() throws Exception {
final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
FutureUtils
.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}

/**
* Tests that we can obtain the self gateway from a RpcEndpoint and can interact with
* it via the self gateway.
*/
@Test
public void testSelfGateway() throws Exception {
int expectedValue = 1337;
BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue);
try {
//启动端点,这里面会启动actor
baseEndpoint.start();
//获取自身,这里是本地调用
BaseGateway baseGateway = baseEndpoint.getSelfGateway(BaseGateway.class);
CompletableFuture<Integer> foobar = baseGateway.foobar();
assertEquals(Integer.valueOf(expectedValue), foobar.get());
} finally {
RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
}
}
//一个端点,实现了RpcEndpoint,并且实现了BaseGateway
public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {

private final int foobarValue;

protected BaseEndpoint(RpcService rpcService, int foobarValue) {
super(rpcService);

this.foobarValue = foobarValue;
}
//实现BaseGateway接口的方法,在接受远程调用的时候,返回的则是这个BaseGateway,可以调用这个方法
@Override
public CompletableFuture<Integer> foobar() {
return CompletableFuture.completedFuture(foobarValue);
}
}
public interface BaseGateway extends RpcGateway {
CompletableFuture<Integer> foobar();
}
}

上面出现了几个比较重要的类或者接口,具体类图如下,下面一个个来介绍

image

RpcGateway

有两个方法,getAddress和getHostname,首先想想,RPC,全程为远程过程调用,要想调用另外一个进程的过程,则需要知道这个过程的地址和端口,这个相当于是一个代理,或者说是一个网关,一个端点或者说进程想要被远程调用,则必须实现这个方法,并且一般由一个接口来继承RpcGateway,并且在这个接口中定义一些方法,然后在实现类中实现这些方法给远程调用。 而在RPC的客户端,则返回的是这个gateway。 基本所有的组件都继承了这个RpcGateway或者他的子接口。

RpcServer 和 AkkaInvocationHandler

比较难理解的就是这两个类,第一个接口,可以把它理解成一个和远端交互的能力,这个接口也实现了RpcGateway。

AkkaInvocationHandler是RpcServer的一个Akka实现,同时实现了InvocationHandler,表明也是一个代理调用处理器,他是由RpcService在一个endpoint启动的时候创建。

RpcEndpoint

这个类是和Actor绑定的,每个RpcEndpoint都有一个Actor对应,并且实现了RpcGateway接口
在RpcEndpoint中还定义了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法来执行Rpc调用,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint/进行Rpc调用时,其会委托RcpServer进行处理。

RpcService

这个类相当于一个服务工具类,主要起到根据Endpoint来启动 RpcServer(Actor),连接到某一个RpcServer,并且返回一个RpcGateway,停止服务等

AkkaRpcService

RpcService的实现类,也就是Akka的实现。封装了ActorSystem,RPC服务启动一个Akka参与者来接收来自RpcGateway}的RPC调用

请看下图和其中的解释。
image

在这个类中有一个Map 声明如下,他保存了每一个ActorRef和RpcEndpoint的映射关系,作用是在停止服务的时候停止actor

1
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);

RPCEndPoint初始化过程

下面来看看具体一个RpcEndpoint实例化的源码解析

image

在实例化RpcEndpoint的时候,进入到构造函数中,传入了上面实例化的RpcService还有endpointId,每个endpoint都有一个唯一的endpointid,在构造函数中做了两件事:

1
2
3
4
5
6
7
8
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");

this.rpcServer = rpcService.startServer(this);

this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
  1. 启动rpcSerer,并且将返回的rpcServer赋值到类变量rpcServer上,注意,这里返回的rpcServer,是一个代理类,在调用rpcServer的时候,会被invoke方法拦截。
  2. 设置 this.mainThreadExecutor =MainThreadExecutor(rpcServer, this::validateRunsInMainThread),首先这个类的构造函数有一个参数为MainThreadExecutable gateway,RpcServer继承了此接口,而AkkaInvocationHandler实现了RpcServer,则也实现了这个接口,MainThreadExecutable接口中有 runAsync、callAsync、scheduleRunAsync等方法,标识在底层RPC端点的主线程中执行runnable。
    image
    而这个 MainThreadExecutor 又是一个Executor,所以在子类中调用类似下图的的方法,最终执行的还是这个RpcEndopoint所在的线程,可以共享上下文,并且不会有并发问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Executor which executes runnables in the main thread context.
*/
protected static class MainThreadExecutor
implements ComponentMainThreadExecutor {

private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;

MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {
//这个gateway还是这个RpcEndpoint的rpcServer
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
}

public void runAsync(Runnable runnable) {
//在比如函数回调中,还是同一个线程政治性的,即rpcServer
gateway.runAsync(runnable);
}

public void scheduleRunAsync(Runnable runnable, long delayMillis) {
gateway.scheduleRunAsync(runnable, delayMillis);
}

public void execute(@Nonnull Runnable command) {
runAsync(command);
}
...

RpcEndpoint通信过程

当初始化完成之后,就可以发送消息,请看图。
image

start流程

上面实例化RpcEndpoint后,当中的rpcServer的状态Stop的,需要调用start方法

  • 调用RpcEndpoint#start
  • 转发给本身的RpcServer#start
  • 因为这个rpcServer是一个代理类,所以转发到了AkkaInvocationHandler中去了,被拦截了
    • 这里首先是获取这个方法的定义方法,如果是在AkkaBasedEndpoint、Object、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer中调用的呢,代表是本地调用,或者说是初始化调用,肯定是不涉及到远程的,则直接调用响应方法就可以了
    • 这里传入的对象是自己本身。
  • 调用AkkaInvocationHandler#start;
  • 通过ActorRef#tell给对应的Actor发送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());;
  • 调用AkkaRpcActor#handleControlMessage处理控制类型消息;
  • 在主线程中将自身状态变更为Started状态;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
declaringClass.equals(Object.class) ||
declaringClass.equals(RpcGateway.class) ||
declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
...
} else {
result = invokeRpc(method, args);
}
return result;
}

调用远程的RPC流程

要实现远程调用,主要通过 AkkaRpcService的connect方法实现,连个参数,一个address,一个clazz,这个clazz是这个方法要返回的代理类的接口类型,比如DispatcherGateway ,他定义了很多方法。

AkkaRpcService.java#connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
final String address,
final Class<C> clazz) {

return connectInternal(
address,
clazz,
(ActorRef actorRef) -> {
//这里定义了一个Function,在下面ask远程endpoint之后,
//返回了ActorRef,还有address和host,
//则根据这个创建一个Akka执行处理器,调用远程就像调用本地的方法一样,爽
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

return new AkkaInvocationHandler(
addressHostname.f0,
addressHostname.f1,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null);
});
}

然后继续往下走,定义了connectInternal方法,,请看注释

AkkaRpcService.java#connectInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
checkState(!stopped, "RpcService is stopped");

LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
address, clazz.getName());
//使用actorSystem来选择actor
final ActorSelection actorSel = actorSystem.actorSelection(address);
//得到一个唯一actor定义
final Future<ActorIdentity> identify = Patterns
.ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
//异步调用,在拿到结果之后返回这个ActorRef
(ActorIdentity actorIdentity) -> {
if (actorIdentity.getRef() == null) {
throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
} else {
return actorIdentity.getRef();
}
});

final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava(
//调用远程,在远程Actor中会处理RemoteHandshakeMessage类型的消息,主要判断远程的Endpoint是否是实现了传入的gateway接口并且判断版本
Patterns
.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));

return actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
//搞定之后拿到上面创建的执行处理器
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
//构造一个代理对象,然后后面就可以调用RpcGateway跟调用本地一样,非常愉快的
@SuppressWarnings("unchecked")
C proxy = (C) Proxy.newProxyInstance(
classLoader,
new Class<?>[]{clazz},
invocationHandler);

return proxy;
},
actorSystem.dispatcher());
}

AkkaRpcActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
if (!isCompatibleVersion(handshakeMessage.getVersion())) {
sendErrorIfSender(new AkkaHandshakeException(
String.format(
"Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",
handshakeMessage.getVersion(),
getVersion())));
} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
sendErrorIfSender(new AkkaHandshakeException(
String.format(
"The rpc endpoint does not support the gateway %s.",
handshakeMessage.getRpcGateway().getSimpleName())));
} else {
getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
}
}

总结

主要介绍了rpc通信的核心类,以及Actor初始化流程和远程调用的流程。

好类,总算讲了个大概,flink源码博大精深,有很多地方值得我们学习的,鄙人才疏学浅,可能有疏漏之处,也可能有些地方讲的不对,请多指教。