Java LocalMode类使用实例

发布时间:2025-12-09 11:53:49 浏览次数:2

实例1: testApplication

import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws Exception{  try {    // write messages to Kafka topic    Configuration conf = getConfig();    writeToTopic();    // run app asynchronously; terminate after results are checked    LocalMode.Controller lc = asyncRun(conf);    // check for presence of output file    waitForOutputTuples();    // compare output lines to input    compare();    lc.shutdown();  } catch (ConstraintViolationException e) {    Assert.fail("constraint violations: " + e.getConstraintViolations());  }} 

实例2: testApplication

import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws IOException, Exception{  try {    LocalMode lma = LocalMode.newInstance();    Configuration conf = new Configuration(false);    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml"));        lma.prepareDAG(new Application(), conf);    LocalMode.Controller lc = lma.getController();    lc.run(5000);    // get messages from Kafka topic and compare with input    chkOutput();    lc.shutdown();  } catch (ConstraintViolationException e) {    Assert.fail("constraint violations: " + e.getConstraintViolations());  }} 

实例3: testApplication

import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws Exception{  try {    Configuration conf = new Configuration(false);    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml"));    /*     * Run the application asynchronously and keep polling for results till timeout.     */    LocalMode.Controller lc = asyncRun(conf);    waitForOutputTuples();    /*     * Validate the data contents of results.     */    validateTuples();    lc.shutdown();  } catch (ConstraintViolationException e) {    Assert.fail("constraint violations: " + e.getConstraintViolations());  }} 

实例4: testApplication

import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws Exception{  try {    // write messages to Kafka topic    Configuration conf = getConfig();    writeToTopic();    // run app asynchronously; terminate after results are checked    LocalMode.Controller lc = asyncRun(conf);    // check for presence of output file    waitForOutputTuples();    // compare output lines to input    compare();    cleanTable();    lc.shutdown();  } catch (ConstraintViolationException e) {    Assert.fail("constraint violations: " + e.getConstraintViolations());  }} 
localmode
需要做网站?需要网络推广?欢迎咨询客户经理 13272073477