Show / Hide Table of Contents

    有些情况下简单的消息/响应模式是不够的,并且客户端需要收到异步通知。 例如,一个用户可能想要在他的朋友发布新即时消息的时候收到通知。

    客户端观察者是一个允许异步通知客户端的机制。 一个观察者是一个继承自IGrainObserver的单向的异步接口,并且它的所有方法都是void的。 grain通过像调用grain接口的方法一样调用观察者的方法来发送一个通知给观察者,不同是观察者方法没有返回值并且grain不会依赖调用结果。 Orleans运行时将会确保单向传递通知的成功。 发布通知的grain应该提供API来添加或者删除观察者。 另外,为了方便通常会暴露一个取消已有订阅的方法。 grain的开发者可以使用Orleans的ObserverSubscriptionManager<T>泛型类来简化被观察的grain类型的开发。

    为了订阅一个通知,客户端必须首先创建一个本地的实现了观察者接口的C#对象。 然后调用然后调用观察者工厂的CreateObjectReference()方法来把C#对象转换成一个grain引用,这样就可以传递给通知grain的订阅方法了。

    这个模型也可以用作其他grain接口异步通知。 不像客户端订阅的情况,订阅的grain只要实现观察者接口,并且把自身作为一个引用传递给自己(例如this.AsReference)。

    代码例子

    让我们假设有一个grain周期性地给客户端发送消息。为了简单期间,我们例子中的消息将会是一个字符串。我们首先在客户端定义接收这个消息的接口。

    这个接口将会是这样

    public interface IChat : IGrainObserver
    {
        void ReceiveMessage(string message);
    }
    

    唯一特殊的一点是这个接口要继承自IGrainObserver。现在任何想要接受这些消息的客户端应该实现一个实现了IChat接口的类。

    最简单的例子是这样的:

    public class Chat : IChat
    {
        public void ReceiveMessage(string message)
        {
            Console.WriteLine(message);
        }
    }
    

    现在在服务器上我应该有一个发送聊天消息给客户端的grain。这个grain也应该有让客户端来订阅和退订他们自己的机制。订阅这个grain可以使用实用类ObserverSubscriptionManager:

    class HelloGrain : Grain, IHello
    {
        private ObserverSubscriptionManager<IChat> _subsManager;
    
        public override async Task OnActivateAsync()
        {
            // We created the utility at activation time.
            _subsManager = new ObserverSubscriptionManager<IChat>();
            await base.OnActivateAsync();
        }
    
        // Clients call this to subscribe.
        public async Task Subscribe(IChat observer)
        {
            _subsManager.Subscribe(observer);
        }
    
        //Also clients use this to unsubscribe themselves to no longer receive the messages.
        public async Task UnSubscribe(IChat observer)
        {
            _SubsManager.Unsubscribe(observer);
        }
    }
    

    给客户端发消息可以使用ObserverSubscriptionManager<IChat>实例Notify方法。这个方法接受一个Action<T>方法或者lambda表达式(这里的T是IChat的类型)

    public Task SendUpdateMessage(string message)
    {
        _SubsManager.Notify(s => s.ReceiveMessage(message));
        return TaskDone.Done;
    }
    

    现在我们的服务器已经有一个发送消息给观察者客户端的方法,有两个订阅/退订的方法和一个实现了一个可以观察grain消息的类的客户端。最后的异步是在客户端创建一个使用我们之前实现的Chat 类的观察者的引用并且让它在订阅后接受消息。

    代码如下:

    //首先创建grain的引用
    var friend = GrainClient.GrainFactory.GetGrain<IHello>(0);
    Chat c = new Chat();
    
    //为chat创建一个用来订阅可观察的grain的引用。
    var obj = await GrainClient.GrainFactory.CreateObjectReference<IChat>(c);
    //订阅这个实例来接受消息。
    await friend.Subscribe(obj);
    

    现在无论何时我们服务器上的grain调用SendUpdateMessage方法,所有订阅了得客户端都将受到消息。在我们的客户端代码里,Chat的实例变量c会受到消息并且打印到控制台。

    注意: 传递给CreateObjectReference的对象是通过通过一个WeakReference<T>实现并且因此如果没有其他的引用存在会被垃圾收集掉。用户应该为每一个不像被垃圾回收的观察者维护一个引用。

    注意: 对观察者的支持可能在将来会被移除并且被简单消息流SMS取代,简单消息流能更强大灵活和可靠地支持同样的功能。

    Next

    下面我们看一下开发一个grain

    • Improve this Doc
    Back to top Copyright © 2015-2016 Microsoft
    Generated by DocFX