有些情况下简单的消息/响应模式是不够的,并且客户端需要收到异步通知。 例如,一个用户可能想要在他的朋友发布新即时消息的时候收到通知。
客户端观察者是一个允许异步通知客户端的机制。
一个观察者是一个继承自IGrainObserver
的单向的异步接口,并且它的所有方法都是void的。
grain通过像调用grain接口的方法一样调用观察者的方法来发送一个通知给观察者,不同是观察者方法没有返回值并且grain不会依赖调用结果。
Orleans运行时将会确保单向传递通知的成功。
发布通知的grain应该提供API来添加或者删除观察者。
另外,为了方便通常会暴露一个取消已有订阅的方法。
grain的开发者可以使用Orleans的ObserverSubscriptionManager<T>
泛型类来简化被观察的grain类型的开发。
为了订阅一个通知,客户端必须首先创建一个本地的实现了观察者接口的C#对象。
然后调用然后调用观察者工厂的CreateObjectReference()
方法来把C#对象转换成一个grain引用,这样就可以传递给通知grain的订阅方法了。
这个模型也可以用作其他grain接口异步通知。
不像客户端订阅的情况,订阅的grain只要实现观察者接口,并且把自身作为一个引用传递给自己(例如this.AsReference
让我们假设有一个grain周期性地给客户端发送消息。为了简单期间,我们例子中的消息将会是一个字符串。我们首先在客户端定义接收这个消息的接口。 这个接口将会是这样 唯一特殊的一点是这个接口要继承自 最简单的例子是这样的: 现在在服务器上我应该有一个发送聊天消息给客户端的grain。这个grain也应该有让客户端来订阅和退订他们自己的机制。订阅这个grain可以使用实用类 给客户端发消息可以使用 现在我们的服务器已经有一个发送消息给观察者客户端的方法,有两个订阅/退订的方法和一个实现了一个可以观察grain消息的类的客户端。最后的异步是在客户端创建一个使用我们之前实现的 代码如下: 现在无论何时我们服务器上的grain调用 注意: 传递给 注意: 对观察者的支持可能在将来会被移除并且被简单消息流SMS取代,简单消息流能更强大灵活和可靠地支持同样的功能。 下面我们看一下开发一个grain代码例子
public interface IChat : IGrainObserver
{
void ReceiveMessage(string message);
}
IGrainObserver
。现在任何想要接受这些消息的客户端应该实现一个实现了IChat
接口的类。public class Chat : IChat
{
public void ReceiveMessage(string message)
{
Console.WriteLine(message);
}
}
ObserverSubscriptionManager
:class HelloGrain : Grain, IHello
{
private ObserverSubscriptionManager<IChat> _subsManager;
public override async Task OnActivateAsync()
{
// We created the utility at activation time.
_subsManager = new ObserverSubscriptionManager<IChat>();
await base.OnActivateAsync();
}
// Clients call this to subscribe.
public async Task Subscribe(IChat observer)
{
_subsManager.Subscribe(observer);
}
//Also clients use this to unsubscribe themselves to no longer receive the messages.
public async Task UnSubscribe(IChat observer)
{
_SubsManager.Unsubscribe(observer);
}
}
ObserverSubscriptionManager<IChat>
实例Notify
方法。这个方法接受一个Action<T>
方法或者lambda表达式(这里的T是IChat
的类型)public Task SendUpdateMessage(string message)
{
_SubsManager.Notify(s => s.ReceiveMessage(message));
return TaskDone.Done;
}
Chat
类的观察者的引用并且让它在订阅后接受消息。//首先创建grain的引用
var friend = GrainClient.GrainFactory.GetGrain<IHello>(0);
Chat c = new Chat();
//为chat创建一个用来订阅可观察的grain的引用。
var obj = await GrainClient.GrainFactory.CreateObjectReference<IChat>(c);
//订阅这个实例来接受消息。
await friend.Subscribe(obj);
SendUpdateMessage
方法,所有订阅了得客户端都将受到消息。在我们的客户端代码里,Chat
的实例变量c
会受到消息并且打印到控制台。CreateObjectReference
的对象是通过通过一个WeakReference<T>
实现并且因此如果没有其他的引用存在会被垃圾收集掉。用户应该为每一个不像被垃圾回收的观察者维护一个引用。Next