java - Getting Spring-XD and the hdfs sink to work for maprfs -
this question spring-xd release 1.0.1 working maprfs, officially not yet supported. still work.
so did:
1) adjusted xd-shell , xd-worker , xd-singlenode shell scripts accept parameter --hadoopdistro mapr
2) added libraries new directory $xd_home/lib/mapr
avro-1.7.4.jar jersey-core-1.9.jar hadoop-annotations-2.2.0.jar jersey-server-1.9.jar hadoop-core-1.0.3-mapr-3.0.2.jar jetty-util-6.1.26.jar hadoop-distcp-2.2.0.jar maprfs-1.0.3-mapr-3.0.2.jar hadoop-hdfs-2.2.0.jar protobuf-java-2.5.0.jar hadoop-mapreduce-client-core-2.2.0.jar spring-data-hadoop-2.0.2.release-hadoop24.jar hadoop-streaming-2.2.0.jar spring-data-hadoop-batch-2.0.2.release-hadoop24.jar hadoop-yarn-api-2.2.0.jar spring-data-hadoop-core-2.0.2.release-hadoop24.jar hadoop-yarn-common-2.2.0.jar spring-data-hadoop-store-2.0.2.release-hadoop24.jar
3) run bin/xd-singlenode --hadoopdistro mapr
, shell/bin/xd-shell --hadoopdistro mapr
.
when creating , deploying stream via stream create foo --definition "time | hdfs" --deploy
, data written file tmp/xd/foo/foo-1.txt.tmp on maprfs. yet when undeploying stream, following exceptions appears:
org.springframework.data.hadoop.store.storeexception: failed renaming /xd/foo/foo-1.txt.tmp /xd/foo/foo-1.txt; nested exception java.io.filenotfoundexception: requested file /xd/foo/foo-1.txt not exist. @ org.springframework.data.hadoop.store.support.outputstoreobjectsupport.renamefile(outputstoreobjectsupport.java:261) @ org.springframework.data.hadoop.store.output.textfilewriter.close(textfilewriter.java:92) @ org.springframework.xd.integration.hadoop.outbound.hdfsdatastoremessagehandler.dostop(hdfsdatastoremessagehandler.java:58) @ org.springframework.xd.integration.hadoop.outbound.hdfsstoremessagehandler.stop(hdfsstoremessagehandler.java:94) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ org.springframework.aop.support.aoputils.invokejoinpointusingreflection(aoputils.java:317) @ org.springframework.aop.framework.jdkdynamicaopproxy.invoke(jdkdynamicaopproxy.java:201) @ com.sun.proxy.$proxy120.stop(unknown source) @ org.springframework.integration.endpoint.eventdrivenconsumer.dostop(eventdrivenconsumer.java:64) @ org.springframework.integration.endpoint.abstractendpoint.stop(abstractendpoint.java:100) @ org.springframework.integration.endpoint.abstractendpoint.stop(abstractendpoint.java:115) @ org.springframework.integration.config.consumerendpointfactorybean.stop(consumerendpointfactorybean.java:303) @ org.springframework.context.support.defaultlifecycleprocessor.dostop(defaultlifecycleprocessor.java:229) @ org.springframework.context.support.defaultlifecycleprocessor.access$300(defaultlifecycleprocessor.java:51) @ org.springframework.context.support.defaultlifecycleprocessor$lifecyclegroup.stop(defaultlifecycleprocessor.java:363) @ org.springframework.context.support.defaultlifecycleprocessor.stopbeans(defaultlifecycleprocessor.java:202) @ org.springframework.context.support.defaultlifecycleprocessor.stop(defaultlifecycleprocessor.java:106) @ org.springframework.context.support.abstractapplicationcontext.stop(abstractapplicationcontext.java:1186) @ org.springframework.xd.module.core.simplemodule.stop(simplemodule.java:234) @ org.springframework.xd.dirt.module.moduledeployer.destroymodule(moduledeployer.java:132) @ org.springframework.xd.dirt.module.moduledeployer.handleundeploy(moduledeployer.java:111) @ org.springframework.xd.dirt.module.moduledeployer.undeploy(moduledeployer.java:83) @ org.springframework.xd.dirt.server.containerregistrar.undeploymodule(containerregistrar.java:261) @ org.springframework.xd.dirt.server.containerregistrar$streammodulewatcher.process(containerregistrar.java:884) @ org.apache.curator.framework.imps.namespacewatcher.process(namespacewatcher.java:67) @ org.apache.zookeeper.clientcnxn$eventthread.processevent(clientcnxn.java:522) @ org.apache.zookeeper.clientcnxn$eventthread.run(clientcnxn.java:498) caused by: java.io.filenotfoundexception: requested file /xd/foo/foo-1.txt not exist. @ com.mapr.fs.maprfilesystem.getmaprfilestatus(maprfilesystem.java:805) @ com.mapr.fs.maprfilesystem.delete(maprfilesystem.java:629) @ org.springframework.data.hadoop.store.support.outputstoreobjectsupport.renamefile(outputstoreobjectsupport.java:258) ... 29 more
i had @ outputstoreobjectsupport.renamefile()
function. when file on hdfs finished, method tries rename file /xd/foo/foo-1.txt.tmp xd/foo/foo1.txt. relevant code:
try { filesystem fs = path.getfilesystem(getconfiguration()); boolean succeed; try { fs.delete(topath, false); log.info("renaming path=[" + path + "] topath=[" + topath + "]"); succeed = fs.rename(path, topath); } catch (exception e) { throw new storeexception("failed renaming " + path + " " + topath, e); } if (!succeed) { throw new storeexception("failed renaming " + path + " " + topath + " because hdfs returned false"); } }
when target file not exist on hdfs, maprfs seems throw exception when fs.delete(topath, false)
called. yet throwing exception in case not make sense. assume other filesystem implementations behave differently, point still need verify. unfortuntately cannot find sources maprfilesystem.java. closed source? me better understand issue. has experience writing spring-xd maprfs? or renaming files on maprfs spring-data-hadoop?
edit
i managed reproduce issue outside of spring xd simple test case (see below). note exception thrown if inwritingsuffix or inwritingprefix set. otherwise spring-hadoop not attempt rename file. still somehow unsatisfactory workaround me: refrain using inwritingprefixes , inwritingsuffixes.
@contextconfiguration("context.xml") @runwith(springjunit4classrunner.class) public class maprfssinktest { @autowired configuration configuration; @autowired filesystem filesystem; @autowired datastorewriter<string >storewriter; @test public void testrenameonmaprfs() throws ioexception, interruptedexception { path testpath = new path("/tmp/foo.txt"); filesystem.delete(testpath, true); textfilewriter writer = new textfilewriter(configuration, testpath, null); writer.setinwritingsuffix("tmp"); writer.write("some entity"); writer.close(); } @test public void teststorewriter() throws ioexception { this.storewriter.write("something"); } }
i created new branch spring-hadoop supports maprfs:
https://github.com/blinse/spring-hadoop/tree/origin/2.0.2.release-mapr
building release , using resulting jar works fine hdfs sink.