主要内容

过程卡夫卡使用事件MATLAB

的用法MATLAB流数据框架®生产服务器™来处理卡夫卡的事件®流。该示例提供并解释recamanSum而且initRecamanSum处理事件流的流分析函数和demoRecaman创建事件流、验证事件流创建、使用流分析函数处理事件流并将结果写入输出流的脚本。

示例函数和脚本位于support_package_root\议员\流\ \数值例子文件夹,support_package_root是系统上支持包的根文件夹。

先决条件

  • 你一定有MATLAB生产服务器流数据框架已安装在您的系统上。有关更多信息,请参见为MATLAB生产服务器安装流数据框架

  • 你必须有一个运行的Kafka服务器,你有必要的权限创建主题。这个例子假设你的Kafka主机的网络地址是kafka.host.com: 9092

写流分析MATLAB函数

对于本例,使用示例MATLAB函数recamanSum而且initRecamanSum.之后,您迭代recamanSum流函数在多个事件上计算结果。

写有状态函数

recamanSum函数是有状态.在有状态函数中,数据状态在事件之间共享,过去的事件可能会影响当前事件的处理方式。recamanSum计算流变量中数值序列的累积和R,并返回一个表cSum和结构状态.表cSum中元素的累积和R还有时间戳。结构状态在其字段中包含序列的最终值cumsum

函数[cSum, state] = recamanSum(data, state) timestamp = data. properties . rowtimes;Key = data.key;sum = cumsum(data.R) + state.cumsum;状态。cumsum= sum(end); cSum = timetable(timestamp, key, sum);结束

写状态初始化命令功能

initRecamanSum函数的第一次迭代初始化状态recamanSum函数。

函数state = initRecamanSum(config)状态。Cumsum = 0;结束

创建示例流事件

要运行该示例,需要示例流数据。的demoRecaman脚本包含以下代码,用于创建由Recamán序列的前1000个元素组成的流数据,还包含将序列写入Kafka主题的代码recamanSum_data

  1. 设置Kafka主机名和端口号。

    kafkaHost =“kafka.host.com”;kafkaPort = 9092;

  2. 创建Recamán序列的前1000个元素。

    要创建序列,可以使用以下命令recamanTimeTable函数也位于\ \数值例子文件夹中。recamanTimeTable创建包含第一个的时间表NRecamán序列的元素。

    函数= recamanTimeTable(N) rs = 0 (1,N);N = k-1;相减= rs(k-1) - n;如果减去> 0 && any(rs == Subtract) == false rs(k) = Subtract;其他的Rs (k) = Rs (k-1) + n;结束结束incr = seconds(1:N);thisVeryInstant =...convertTo (datetime,“epochtime”“时代”“1970-1-1”);thisVeryInstant = datetime(thisVeryInstant,“ConvertFrom”...“epochtime”“时代”“1970-1-1”);thisVeryInstant。时区=“UTC”;timestamp = (thisVeryInstant - seconds(N)) + incr';key = (0:N-1)';键=字符串(键);R = rs';tt =时间表(时间戳,R,键);结束

  3. 的结果recamanTimeTable在时间表上。

    tt0 = recamanTimeTable(1000);

  4. 控件连接的流对象recamanSum_data的话题。稍后,您将编写包含Recamán序列的时间表recamanSum_data

    dataKS = kafkaStream(kafkaHost, kafkaPort,“recamanSum_data”行= 100);

  5. 如果recamanSum_data主题已存在,请删除它。

    试一试deleteTopic (dataKS);结束

  6. 将整个Recamán序列写入recamanSum_data的话题。

    writetimetable (dataKS tt0);

验证样例数据创建

要验证所创建的样例流事件,请确认从recamanSum_data主题与您创建并写入的示例数据相同recamanSum_data的话题。的demoRecaman脚本包含以下代码。

  1. 方法读取一个数据窗口(100行)recamanSum_data主题放入时间表中tt1

    tt1 = readschedule (dataKS);

  2. 检查数据是否已被读入tt1等于您编写的Recamán序列中的前100个元素。

    如果Isequal (tt0(1:height(tt1),:), tt1) fprintf(1,"向主题%s.\n写入数据成功", dataKS.Name);结束

  3. 停止从dataKS流,因为以后你用dataKS再读一遍recamanSum_data的话题。不允许使用多个流从同一主题读取。

    停止(dataKS);

使用流分析函数处理流事件

迭代recamanSum流分析函数多次从输入流读取数字序列,计算其累积和,并将结果写入输出流。的demoRecaman脚本包含以下代码。

  1. 控件连接的输出流recamanSum_results的话题。使用recamanSum_results的输出recamanSum流媒体功能。

    resultKS = kafkaStream(kafkaHost,kafkaPort,“recamanSum_results”...行= 100);

  2. 创建一个事件流处理器来迭代recamanSum连接到流的输入主题上的流函数dataKS.将结果写入连接到流的输出主题resultKS.使用名为的持久存储连接RR在迭代之间存储数据状态。

    rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,...StateStore =“农达”OutputStream = resultKS);
  3. 执行流函数十次。因为窗口大小,或者一次读取的行数是100,所以10次迭代将消耗1000个元素的整个序列。

    流(1,计算Recaman序列的累积和。\n);执行(负责,10);

  4. 删除事件流处理器。这就关闭了StateStore,需要在一行中多次运行此脚本。

    清晰的负责

  5. 从输出流中读取结果。

    流(1,"读取结果来自%s.\n", resultKS.Name);Tt2 = schedule .empty;N = 1:10 tt2 = [tt2;readtimetable (resultKS)];结束cSum = cumsum(tt0.R);如果tt2(最终,:)。sum == cSum(end) fprintf(1,"累计和计算成功:%d.\n"...tt2(最终,:).sum);其他的流(1,"预期累积总和%d。计算%d代替。\n"...cSum(结束),tt2(最终,:).sum);结束

当你运行整个demoRecaman脚本,您将看到以下输出。

向主题recamanSum_data写入数据成功。计算Recaman序列的累积和。读取recamanSum_results的结果。累计总和计算成功:837722。

另请参阅

||||||

相关的话题

Baidu
map