当前位置: 首页 > article >正文

【Flink源码分析】3. Flink1.19源码分析-精通动态代理

3.1 Java 动态代理

动态代理是一种设计模式,它允许在运行时创建代理对象,并将方法调用重定向到不同的实际对象。它使我们能够在不修改现有代码的情况下增加或改变某个对象的行为。

3.1.1 InvocationHandler接口:

这个接口定义了一个invoke方法,该方法在代理实例上的方法被调用时调用。

public interface InvocationHandler {
    public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;
}

3.1.2 Proxy类:

这个类提供了创建动态代理类和实例的静态方法。

   public static Object newProxyInstance(ClassLoader loader,
                                          Class<?>[] interfaces,
                                          InvocationHandler h)
        throws IllegalArgumentException

3.1.2.1 ClassLoader loader:

这个类加载器用于定义代理类的类加载器。通常,我们可以使用被代理对象的类加载器,即targetObject.getClass().getClassLoader()。
代理类必须和它所表示的接口在同一个类加载器的命名空间中,以确保代理类能够访问被代理的接口。
总结:类加载器,targetObject.getClass().getClassLoader()。

3.1.2.2 Class<?>[] interfaces:

这是一个接口数组,表示代理类需要实现的接口。
代理类将实现这些接口,并可以在运行时动态地调用这些接口的方法。
总结:动态代理类会调用实现该接口的方法。

3.1.2.3 InvocationHandler h:

这是一个调用处理程序,它负责实现接口中的方法调用。
当代理类的方法被调用时,实际上会调用这个 InvocationHandler 的 invoke 方法。 invoke 方法会接受被调用的方法、方法的参数以及代理实例本身作为参数。
总结:动态代理类调用方法的时候,会流转到 invoke 方法中,在 invoke 方法中可以完成我们要做的操作,比如打印日志。

3.2 Java 动态代理Demo

3.2.1 ResourceManagerGateway 接口

package com.annn.fink.proxy;

/**
 * 模拟Flink为代理目标对象定义一个接口
 */
public interface ResourceManagerGateway {
    /**
     * 定义一个注册方法
     */
    void registerTaskExecutor();
}

3.2.2 ResourceManager 类

package com.annn.fink.proxy;

/**
 * 创建实现该接口的目标对象
 */
public class ResourceManager implements ResourceManagerGateway{

    /**
     * 实现方法中打印一句话
     */
    @Override
    public void registerTaskExecutor() {
        System.out.println("注册 registerTaskExecutor ");
    }
}

3.2.3 PekkoInvocationHandler 类

package com.annn.fink.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * 创建实现 InvocationHandler 接口的类。
 */
public class PekkoInvocationHandler implements InvocationHandler {

    private Object target; //被代理的对象

    public PekkoInvocationHandler(Object target) {
        this.target = target;
    }

    /**
     * 在 invoke 中调用内部方法 invokeRpc
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        return invokeRpc(method,args);
    }

    /**
     * 在 invokeRpc 中实现自己的逻辑,比如向 ResourceManager 发送 Pekko 的请求
     * flink内部实现的时候会将调用的代理类和方法封装成 RpcInvocation 调用 ask 方法发送给 PekkoRpcActor 接收到消息
     * 内部调用 HandlerMessage 处理不同类型的请求然后通过 java 反射调用最终调用传递给 ResourceManager.registerTaskExecutor 方法
     *
     * @param method
     * @param args
     * @return
     * @throws Exception
     */
    private Object invokeRpc(Method method, Object[] args) throws Exception {
        System.out.println("调用pekko ask方法向 ResourceManager 发送调用的方法");
        Object invoke = method.invoke(target, args);
        System.out.println("结束调用");
        return invoke;
    }
}

3.2.4 Demo

package com.annn.fink.proxy;

import java.lang.reflect.Proxy;

public class Demo {
    public static void main(String[] args) {
        // 创建目标对象
        ResourceManager myObject = new ResourceManager();
        // 创建 InvocationHandler
        PekkoInvocationHandler handler = new PekkoInvocationHandler(myObject);
        // 调用 Proxy.newProxyInstance 静态方法创建动态代理类
        ResourceManagerGateway proxy = (ResourceManagerGateway) Proxy
                .newProxyInstance(myObject.getClass().getClassLoader(),
                        new Class<?>[] { ResourceManagerGateway.class },
                        handler);
        // 调用 registerTaskExecutor 注册方法最终会调用 PekkoInvocationHandler 的 invoke 方法
        proxy.registerTaskExecutor();
    }
}

3.2.5 运行结果

调用pekko ask方法向 ResourceManager 发送调用的方法
注册 registerTaskExecutor 
结束调用

3.2.6 总结动态代理

  1. 定义一个接口
  2. 定义接口实现类
  3. 定义 InvocationHandler
  4. 定义服务,在服务中调用 Proxy.newProxyInstance() 方法创建动态代理

3.3 Flink RPC中的动态代理详解

3.3.1 ResourceManagerGateway 接口

public interface ResourceManagerGateway
        extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {
    CompletableFuture<RegistrationResponse> registerTaskExecutor(
            TaskExecutorRegistration taskExecutorRegistration, @RpcTimeout Time timeout);
}

3.3.2 ResourceManager 实现类

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
        extends FencedRpcEndpoint<ResourceManagerId>
        implements DelegationTokenManager.Listener, ResourceManagerGateway {
    @Override
    public CompletableFuture<RegistrationResponse> registerTaskExecutor(
            final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {
            
	}
}

3.3.3 PekkoInvocationHandler 类

class PekkoInvocationHandler implements InvocationHandler, PekkoBasedEndpoint, RpcServer {

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        Object result;

        if (declaringClass.equals(PekkoBasedEndpoint.class)
                || declaringClass.equals(Object.class)
                || declaringClass.equals(RpcGateway.class)
                || declaringClass.equals(StartStoppable.class)
                || declaringClass.equals(MainThreadExecutable.class)
                || declaringClass.equals(RpcServer.class)) {
            result = method.invoke(this, args);
        } else if (declaringClass.equals(FencedRpcGateway.class)) {
            throw new UnsupportedOperationException(
                    "InvocationHandler does not support the call FencedRpcGateway#"
                            + method.getName()
                            + ". This indicates that you retrieved a FencedRpcGateway without specifying a "
                            + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
                            + "retrieve a properly FencedRpcGateway.");
        } else {
            result = invokeRpc(method, args);
        }

        return result;
    }
    
	private Object invokeRpc(Method method, Object[] args) throws Exception {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null;
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        Duration futureTimeout =
                RpcGatewayUtils.extractRpcTimeout(parameterAnnotations, args, timeout);

        final RpcInvocation rpcInvocation =
                createRpcInvocationMessage(
                        method.getDeclaringClass().getSimpleName(),
                        methodName,
                        isLocalRpcInvocation,
                        parameterTypes,
                        args);

        Class<?> returnType = method.getReturnType();

        final Object result;

        if (Objects.equals(returnType, Void.TYPE)) {
            tell(rpcInvocation);

            result = null;
        } else {
            // Capture the call stack. It is significantly faster to do that via an exception than
            // via Thread.getStackTrace(), because exceptions lazily initialize the stack trace,
            // initially only
            // capture a lightweight native pointer, and convert that into the stack trace lazily
            // when needed.
            final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;

            // execute an asynchronous call
            final CompletableFuture<?> resultFuture =
                    ask(rpcInvocation, futureTimeout)
                            .thenApply(
                                    resultValue ->
                                            deserializeValueIfNeeded(
                                                    resultValue, method, flinkClassLoader));

            final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            resultFuture.whenComplete(
                    (resultValue, failure) -> {
                        if (failure != null) {
                            completableFuture.completeExceptionally(
                                    resolveTimeoutException(
                                            ExceptionUtils.stripCompletionException(failure),
                                            callStackCapture,
                                            address,
                                            rpcInvocation));
                        } else {
                            completableFuture.complete(resultValue);
                        }
                    });

            if (Objects.equals(returnType, CompletableFuture.class)) {
                result = completableFuture;
            } else {
                try {
                    result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);
                } catch (ExecutionException ee) {
                    throw new RpcException(
                            "Failure while obtaining synchronous RPC result.",
                            ExceptionUtils.stripExecutionException(ee));
                }
            }
        }

        return result;
    }
}

3.3.4 PekkoRpcService 类

public class PekkoRpcService implements RpcService {


    @Override
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");

        final SupervisorActor.ActorRegistration actorRegistration = registerRpcActor(rpcEndpoint);
        final ActorRef actorRef = actorRegistration.getActorRef();
        final CompletableFuture<Void> actorTerminationFuture =
                actorRegistration.getTerminationFuture();

        LOG.info(
                "Starting RPC endpoint for {} at {} .",
                rpcEndpoint.getClass().getName(),
                actorRef.path());

        final String address = PekkoUtils.getRpcURL(actorSystem, actorRef);
        final String hostname;
        Option<String> host = actorRef.path().address().host();
        if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }

        Set<Class<?>> implementedRpcGateways =
                new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(PekkoBasedEndpoint.class);

        final InvocationHandler invocationHandler;

        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            // a FencedRpcEndpoint needs a FencedPekkoInvocationHandler
            invocationHandler =
                    new FencedPekkoInvocationHandler<>(
                            address,
                            hostname,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            actorTerminationFuture,
                            ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
                            captureAskCallstacks,
                            flinkClassLoader);
        } else {
            invocationHandler =
                    new PekkoInvocationHandler(
                            address,
                            hostname,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            actorTerminationFuture,
                            captureAskCallstacks,
                            flinkClassLoader);
        }

        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();

        @SuppressWarnings("unchecked")
        RpcServer server =
                (RpcServer)
                        Proxy.newProxyInstance(
                                classLoader,
                                implementedRpcGateways.toArray(
                                        new Class<?>[implementedRpcGateways.size()]),
                                invocationHandler);

        return server;
    }


}

3.3.5 PekkoRpcActor 类

PekkoInvocationHandler 中的远端调用invokeRpc 方法并没有直接调用 invoke 方法,而是将所需参数封装为 RpcInvocation 通过 tell 或 ask 发送到 PekkoRpcActor ,在该类中调用 invoke 方法(tell和ask是Pekko中的通信方式,后面会提到)。

class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {


    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
                .match(ControlMessages.class, this::handleControlMessage)
                .matchAny(this::handleMessage)
                .build();
    }

    private void handleMessage(final Object message) {
        if (state.isRunning()) {
            mainThreadValidator.enterMainThread();

            try {
                handleRpcMessage(message);
            } finally {
                mainThreadValidator.exitMainThread();
            }
        } else {
            log.info(
                    "The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
                    rpcEndpoint.getClass().getName(),
                    message);

            sendErrorIfSender(
                    new EndpointNotStartedException(
                            String.format(
                                    "Discard message %s, because the rpc endpoint %s has not been started yet.",
                                    message, getSelf().path())));
        }
    }

    protected void handleRpcMessage(Object message) {
        if (message instanceof RunAsync) {
            handleRunAsync((RunAsync) message);
        } else if (message instanceof CallAsync) {
            handleCallAsync((CallAsync) message);
        } else if (message instanceof RpcInvocation) {
            handleRpcInvocation((RpcInvocation) message);
        } else {
            log.warn(
                    "Received message of unknown type {} with value {}. Dropping this message!",
                    message.getClass().getName(),
                    message);

            sendErrorIfSender(
                    new UnknownMessageException(
                            "Received unknown message "
                                    + message
                                    + " of type "
                                    + message.getClass().getSimpleName()
                                    + '.'));
        }
    }
   
	private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method rpcMethod = null;

        try {
            String methodName = rpcInvocation.getMethodName();
            Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();

            rpcMethod = lookupRpcMethod(methodName, parameterTypes);
        } catch (final NoSuchMethodException e) {
            log.error("Could not find rpc method for rpc invocation.", e);

            RpcConnectionException rpcException =
                    new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
            getSender().tell(new Status.Failure(rpcException), getSelf());
        }

        if (rpcMethod != null) {
            try {
                // this supports declaration of anonymous classes
                rpcMethod.setAccessible(true);

                final Method capturedRpcMethod = rpcMethod;
                if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                    // No return value to send back
                    runWithContextClassLoader(
                            () -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
                            flinkClassLoader);
                } else {
                    final Object result;
                    try {
                        result =
                                runWithContextClassLoader(
                                        () ->
                                                capturedRpcMethod.invoke(
                                                        rpcEndpoint, rpcInvocation.getArgs()),
                                        flinkClassLoader);
                    } catch (InvocationTargetException e) {
                        log.debug(
                                "Reporting back error thrown in remote procedure {}", rpcMethod, e);

                        // tell the sender about the failure
                        getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
                        return;
                    }

                    final String methodName = rpcMethod.getName();
                    final boolean isLocalRpcInvocation =
                            rpcMethod.getAnnotation(Local.class) != null;

                    if (result instanceof CompletableFuture) {
                        final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
                        sendAsyncResponse(responseFuture, methodName, isLocalRpcInvocation);
                    } else {
                        sendSyncResponse(result, methodName, isLocalRpcInvocation);
                    }
                }
            } catch (Throwable e) {
                log.error("Error while executing remote procedure call {}.", rpcMethod, e);
                // tell the sender about the failure
                getSender().tell(new Status.Failure(e), getSelf());
            }
        }
    }
}

3.4 Flink RPC 底层使用动态代理做什么

  1. 动态代理用到的所有类都是 RpcGateway 的实现,也就是说创建的的 RpcGateway 接口对应实现类的动态代理,比如 ResourceManagerGateway 类;
  2. PekkoInvocationHandler 类实现 invoke 方法,是将代理类的方法,参数类型,参数封装为 RpcInvocation 对象,之后通过 Pello.tell 、Pekko.ask 方法将 RpcInvocation 作为消息发送到代理接口所在的进程中;
  3. 代理接口所在的进程中,接收到消息以后会调用对应的方法。

http://www.kler.cn/a/536614.html

相关文章:

  • [Deepseek-自定义Ollama 安装路径+lmStudio 简易安装]
  • 当今前沿技术:改变未来的核心力量
  • k8sollama部署deepseek-R1模型,内网无坑
  • 逻辑起源 - 比较DS与豆包对“逻辑”源头的提炼差异
  • Java 大视界 -- Java 大数据在智能教育中的应用与个性化学习(75)
  • AI大模型:DeepSeek
  • Docker Desktop安装到其他盘
  • 如何通过Davinci Configurator来新增一个BswM仲裁规则
  • 【diffusers极速入门(八)】GPU 显存节省(减少内存使用)技巧总结
  • Axure设计教程:动态排名图(中继器实现)
  • AIP-135 自定义方法
  • Big.js应用
  • Azure OpenAI 服务调用 DeepSeek 模型完全指南
  • 从 .NET Framework 升级到 .NET 8 后 SignalR 问题处理与解决方案
  • Facebook矩阵营销:多维度布局,精准打击
  • 力扣 无重复字符的最长子串
  • 已验证正常,Java输入字符串生成PDF文件
  • MySQL开窗函数种类和使用总结
  • 将仓库A分支同步到仓库B分支,并且同步commit提交
  • js中,正则表达式m修饰符说明
  • 数据完整性与约束的分类
  • 如何制定旅游计划:从零开始的旅行规划
  • 让相机自己决定拍哪儿!——NeRF 三维重建的主动探索之路
  • Repo vs Git:区别与优缺点
  • kafka服务端之延时操作前传--时间轮
  • docker 安装 mindoc