过程卡夫卡使用事件MATLAB
的用法MATLAB流数据框架®生产服务器™来处理卡夫卡的事件®流。该示例提供并解释recamanSum
而且initRecamanSum
处理事件流的流分析函数和demoRecaman
创建事件流、验证事件流创建、使用流分析函数处理事件流并将结果写入输出流的脚本。
示例函数和脚本位于
文件夹,support_package_root
\议员\流\ \数值例子
是系统上支持包的根文件夹。support_package_root
先决条件
你一定有MATLAB生产服务器流数据框架已安装在您的系统上。有关更多信息,请参见为MATLAB生产服务器安装流数据框架.
你必须有一个运行的Kafka服务器,你有必要的权限创建主题。这个例子假设你的Kafka主机的网络地址是
kafka.host.com: 9092
.
写流分析MATLAB函数
对于本例,使用示例MATLAB函数recamanSum
而且initRecamanSum
.之后,您迭代recamanSum
流函数在多个事件上计算结果。
写有状态函数
的recamanSum
函数是有状态.在有状态函数中,数据状态在事件之间共享,过去的事件可能会影响当前事件的处理方式。recamanSum
计算流变量中数值序列的累积和R
,并返回一个表cSum
和结构状态
.表cSum
中元素的累积和R
还有时间戳。结构状态
在其字段中包含序列的最终值cumsum
.
函数[cSum, state] = recamanSum(data, state) timestamp = data. properties . rowtimes;Key = data.key;sum = cumsum(data.R) + state.cumsum;状态。cumsum= sum(end); cSum = timetable(timestamp, key, sum);结束
写状态初始化命令功能
的initRecamanSum
函数的第一次迭代初始化状态recamanSum
函数。
函数state = initRecamanSum(config)状态。Cumsum = 0;结束
创建示例流事件
要运行该示例,需要示例流数据。的demoRecaman
脚本包含以下代码,用于创建由Recamán序列的前1000个元素组成的流数据,还包含将序列写入Kafka主题的代码recamanSum_data
.
设置Kafka主机名和端口号。
kafkaHost =“kafka.host.com”;kafkaPort = 9092;
创建Recamán序列的前1000个元素。
要创建序列,可以使用以下命令
recamanTimeTable
函数也位于\ \数值例子
文件夹中。recamanTimeTable
创建包含第一个的时间表N
Recamán序列的元素。函数= recamanTimeTable(N) rs = 0 (1,N);为N = k-1;相减= rs(k-1) - n;如果减去> 0 && any(rs == Subtract) == false rs(k) = Subtract;其他的Rs (k) = Rs (k-1) + n;结束结束incr = seconds(1:N);thisVeryInstant =...convertTo (datetime,“epochtime”,“时代”,“1970-1-1”);thisVeryInstant = datetime(thisVeryInstant,“ConvertFrom”,...“epochtime”,“时代”,“1970-1-1”);thisVeryInstant。时区=“UTC”;timestamp = (thisVeryInstant - seconds(N)) + incr';key = (0:N-1)';键=字符串(键);R = rs';tt =时间表(时间戳,R,键);结束
的结果
recamanTimeTable
在时间表上。tt0 = recamanTimeTable(1000);
控件连接的流对象
recamanSum_data
的话题。稍后,您将编写包含Recamán序列的时间表recamanSum_data
.dataKS = kafkaStream(kafkaHost, kafkaPort,“recamanSum_data”行= 100);
如果
recamanSum_data
主题已存在,请删除它。试一试deleteTopic (dataKS);抓,结束
将整个Recamán序列写入
recamanSum_data
的话题。writetimetable (dataKS tt0);
验证样例数据创建
要验证所创建的样例流事件,请确认从recamanSum_data
主题与您创建并写入的示例数据相同recamanSum_data
的话题。的demoRecaman
脚本包含以下代码。
方法读取一个数据窗口(100行)
recamanSum_data
主题放入时间表中tt1
.tt1 = readschedule (dataKS);
检查数据是否已被读入
tt1
等于您编写的Recamán序列中的前100个元素。如果Isequal (tt0(1:height(tt1),:), tt1) fprintf(1,"向主题%s.\n写入数据成功", dataKS.Name);结束
停止从
dataKS
流,因为以后你用dataKS
再读一遍recamanSum_data
的话题。不允许使用多个流从同一主题读取。停止(dataKS);
使用流分析函数处理流事件
迭代recamanSum
流分析函数多次从输入流读取数字序列,计算其累积和,并将结果写入输出流。的demoRecaman
脚本包含以下代码。
控件连接的输出流
recamanSum_results
的话题。使用recamanSum_results
的输出recamanSum
流媒体功能。resultKS = kafkaStream(kafkaHost,kafkaPort,“recamanSum_results”,...行= 100);
创建一个事件流处理器来迭代
recamanSum
连接到流的输入主题上的流函数dataKS
.将结果写入连接到流的输出主题resultKS
.使用名为的持久存储连接RR
在迭代之间存储数据状态。rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,...StateStore =“农达”OutputStream = resultKS);
执行流函数十次。因为窗口大小,或者一次读取的行数是100,所以10次迭代将消耗1000个元素的整个序列。
流(1,计算Recaman序列的累积和。\n);执行(负责,10);
删除事件流处理器。这就关闭了
StateStore
,需要在一行中多次运行此脚本。清晰的负责;
从输出流中读取结果。
流(1,"读取结果来自%s.\n", resultKS.Name);Tt2 = schedule .empty;为N = 1:10 tt2 = [tt2;readtimetable (resultKS)];结束cSum = cumsum(tt0.R);如果tt2(最终,:)。sum == cSum(end) fprintf(1,"累计和计算成功:%d.\n",...tt2(最终,:).sum);其他的流(1,"预期累积总和%d。计算%d代替。\n",...cSum(结束),tt2(最终,:).sum);结束
当你运行整个demoRecaman
脚本,您将看到以下输出。
向主题recamanSum_data写入数据成功。计算Recaman序列的累积和。读取recamanSum_results的结果。累计总和计算成功:837722。
另请参阅
readtimetable
|writetimetable
|kafkaStream
|eventStreamProcessor
|执行
|inMemoryStream
|testStream