Orleans流快速入门
这个指南将会展示设置和使用Orleans Streams的快速方法。 阅读这个文档的其他部分来学习更多关于流的细节。
必要的设置
在这个指导中,我们用一个使用了grain消息给订阅者发送流数据的简单消息流。使用内存存储提供者存储订阅者列表,尽管在实际生产环境中不是一个明智的选择。
<Globals>
<StorageProviders>
<Provider Type="Orleans.Storage.MemoryStorage" Name="Default" />
<Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" />
</StorageProviders>
<StreamProviders>
<Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/>
</StreamProviders>
现在我们可以创建流,使用它们作为生产者发送数据同时作为订阅者接收数据。
创建事件
对于流来说创建事件相对简单。你首先需要访问到你上面再配置文件中(SMSProvider
) 定义的流提供者,并且选择一个流推送数据给它。
//Pick a guid for a chat room grain and chat room stream
var guid = some guid identifying the chat room
//Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
//Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
你能看到我们的流有一个GUID和一个命名空间。这使得鉴别唯一的流变得容易。例如,在一个聊天室中,命名空间是"Rooms"并且GUID是特有的RoomGrain的GUID。
这里我们使用一些已知的聊天室的GUID。现在可以使用流的OnNext
方法给它推送数据。我们在一个定时器中做这些并且使用随机数字。你也可以使用任何其他的数据类型。
RegisterTimer(s =>
{
return stream.OnNextAsync(new System.Random().Next());
}, null, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(1000));
订阅和接收流数据
接收数据我们可以使用显式/隐式的订阅,这在本手册的其它页会详细介绍。这里我们使用较简单的隐式订阅。当一个grain类型想要隐式地描述一个流的时候,它使用ImplicitStreamSubscription (namespace)]
特性。
在我们的例子中,我们这样定义一个ReceiverGrain:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
现在无论何时给命名空间是RANDOMDATA的流推送数据,具有相同guid类型是ReceiverGrain
的grain将会收到消息。甚至当前没有激活的grain存在,运行时会自动创建一个新的并且发送消息给它。
为了让这个工作,我们需要通过设置我们的接受数据的OnNext
方法完成订阅的过程。ReceiverGrain
应该在它的OnActivateAsync
有类似如下的调用。
//Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
//Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
//Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
//Set our OnNext method to the lambda which simply prints the data, this doesn't make new subscriptions
await stream.SubscribeAsync<int>(async (data, token) => Console.WriteLine(data));
所有的都设置好了。现在唯一需要的是触发我们的生产者grain创建并且之后它将注册定时器并且开始发送随机整数给所有感兴趣的订阅者。
此外,这个指南跳过了许多细节并且只是展示了重点。阅读手册的其他部分和其他的关于RX的资源来更好地理解可以和如何干什么。
响应式编程是解决许多问题的有效方式。例如你可以在订阅者中使用LINQ来过滤数字和各式各样有趣的事。