发布时间:2025-12-09 11:53:49 浏览次数:2
import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws Exception{ try { // write messages to Kafka topic Configuration conf = getConfig(); writeToTopic(); // run app asynchronously; terminate after results are checked LocalMode.Controller lc = asyncRun(conf); // check for presence of output file waitForOutputTuples(); // compare output lines to input compare(); lc.shutdown(); } catch (ConstraintViolationException e) { Assert.fail("constraint violations: " + e.getConstraintViolations()); }} import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws IOException, Exception{ try { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml")); lma.prepareDAG(new Application(), conf); LocalMode.Controller lc = lma.getController(); lc.run(5000); // get messages from Kafka topic and compare with input chkOutput(); lc.shutdown(); } catch (ConstraintViolationException e) { Assert.fail("constraint violations: " + e.getConstraintViolations()); }} import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws Exception{ try { Configuration conf = new Configuration(false); conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml")); /* * Run the application asynchronously and keep polling for results till timeout. */ LocalMode.Controller lc = asyncRun(conf); waitForOutputTuples(); /* * Validate the data contents of results. */ validateTuples(); lc.shutdown(); } catch (ConstraintViolationException e) { Assert.fail("constraint violations: " + e.getConstraintViolations()); }} import com.datatorrent.api.LocalMode; //导入依赖的package包/类@Testpublic void testApplication() throws Exception{ try { // write messages to Kafka topic Configuration conf = getConfig(); writeToTopic(); // run app asynchronously; terminate after results are checked LocalMode.Controller lc = asyncRun(conf); // check for presence of output file waitForOutputTuples(); // compare output lines to input compare(); cleanTable(); lc.shutdown(); } catch (ConstraintViolationException e) { Assert.fail("constraint violations: " + e.getConstraintViolations()); }}