java - Getting Spring-XD and the hdfs sink to work for maprfs -


this question spring-xd release 1.0.1 working maprfs, officially not yet supported. still work.

so did:

1) adjusted xd-shell , xd-worker , xd-singlenode shell scripts accept parameter --hadoopdistro mapr

2) added libraries new directory $xd_home/lib/mapr

avro-1.7.4.jar                              jersey-core-1.9.jar hadoop-annotations-2.2.0.jar                jersey-server-1.9.jar hadoop-core-1.0.3-mapr-3.0.2.jar            jetty-util-6.1.26.jar hadoop-distcp-2.2.0.jar                     maprfs-1.0.3-mapr-3.0.2.jar hadoop-hdfs-2.2.0.jar                       protobuf-java-2.5.0.jar hadoop-mapreduce-client-core-2.2.0.jar      spring-data-hadoop-2.0.2.release-hadoop24.jar hadoop-streaming-2.2.0.jar                  spring-data-hadoop-batch-2.0.2.release-hadoop24.jar hadoop-yarn-api-2.2.0.jar                   spring-data-hadoop-core-2.0.2.release-hadoop24.jar hadoop-yarn-common-2.2.0.jar                spring-data-hadoop-store-2.0.2.release-hadoop24.jar 

3) run bin/xd-singlenode --hadoopdistro mapr , shell/bin/xd-shell --hadoopdistro mapr.

when creating , deploying stream via stream create foo --definition "time | hdfs" --deploy, data written file tmp/xd/foo/foo-1.txt.tmp on maprfs. yet when undeploying stream, following exceptions appears:

org.springframework.data.hadoop.store.storeexception: failed renaming /xd/foo/foo-1.txt.tmp /xd/foo/foo-1.txt; nested exception java.io.filenotfoundexception: requested file /xd/foo/foo-1.txt not exist. @ org.springframework.data.hadoop.store.support.outputstoreobjectsupport.renamefile(outputstoreobjectsupport.java:261) @ org.springframework.data.hadoop.store.output.textfilewriter.close(textfilewriter.java:92) @ org.springframework.xd.integration.hadoop.outbound.hdfsdatastoremessagehandler.dostop(hdfsdatastoremessagehandler.java:58) @ org.springframework.xd.integration.hadoop.outbound.hdfsstoremessagehandler.stop(hdfsstoremessagehandler.java:94) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ org.springframework.aop.support.aoputils.invokejoinpointusingreflection(aoputils.java:317) @ org.springframework.aop.framework.jdkdynamicaopproxy.invoke(jdkdynamicaopproxy.java:201) @ com.sun.proxy.$proxy120.stop(unknown source) @ org.springframework.integration.endpoint.eventdrivenconsumer.dostop(eventdrivenconsumer.java:64) @ org.springframework.integration.endpoint.abstractendpoint.stop(abstractendpoint.java:100) @ org.springframework.integration.endpoint.abstractendpoint.stop(abstractendpoint.java:115) @ org.springframework.integration.config.consumerendpointfactorybean.stop(consumerendpointfactorybean.java:303) @ org.springframework.context.support.defaultlifecycleprocessor.dostop(defaultlifecycleprocessor.java:229) @ org.springframework.context.support.defaultlifecycleprocessor.access$300(defaultlifecycleprocessor.java:51) @ org.springframework.context.support.defaultlifecycleprocessor$lifecyclegroup.stop(defaultlifecycleprocessor.java:363) @ org.springframework.context.support.defaultlifecycleprocessor.stopbeans(defaultlifecycleprocessor.java:202) @ org.springframework.context.support.defaultlifecycleprocessor.stop(defaultlifecycleprocessor.java:106) @ org.springframework.context.support.abstractapplicationcontext.stop(abstractapplicationcontext.java:1186) @ org.springframework.xd.module.core.simplemodule.stop(simplemodule.java:234) @ org.springframework.xd.dirt.module.moduledeployer.destroymodule(moduledeployer.java:132) @ org.springframework.xd.dirt.module.moduledeployer.handleundeploy(moduledeployer.java:111) @ org.springframework.xd.dirt.module.moduledeployer.undeploy(moduledeployer.java:83) @ org.springframework.xd.dirt.server.containerregistrar.undeploymodule(containerregistrar.java:261) @ org.springframework.xd.dirt.server.containerregistrar$streammodulewatcher.process(containerregistrar.java:884) @ org.apache.curator.framework.imps.namespacewatcher.process(namespacewatcher.java:67) @ org.apache.zookeeper.clientcnxn$eventthread.processevent(clientcnxn.java:522) @ org.apache.zookeeper.clientcnxn$eventthread.run(clientcnxn.java:498) caused by: java.io.filenotfoundexception: requested file /xd/foo/foo-1.txt not exist. @ com.mapr.fs.maprfilesystem.getmaprfilestatus(maprfilesystem.java:805) @ com.mapr.fs.maprfilesystem.delete(maprfilesystem.java:629) @ org.springframework.data.hadoop.store.support.outputstoreobjectsupport.renamefile(outputstoreobjectsupport.java:258) ... 29 more 

i had @ outputstoreobjectsupport.renamefile() function. when file on hdfs finished, method tries rename file /xd/foo/foo-1.txt.tmp xd/foo/foo1.txt. relevant code:

        try {         filesystem fs = path.getfilesystem(getconfiguration());          boolean succeed;         try {             fs.delete(topath, false);             log.info("renaming path=[" + path + "] topath=[" + topath + "]");             succeed = fs.rename(path, topath);         } catch (exception e) {             throw new storeexception("failed renaming " + path + " " + topath, e);         }         if (!succeed) {             throw new storeexception("failed renaming " + path + " " + topath + " because hdfs returned false");         }     } 

when target file not exist on hdfs, maprfs seems throw exception when fs.delete(topath, false) called. yet throwing exception in case not make sense. assume other filesystem implementations behave differently, point still need verify. unfortuntately cannot find sources maprfilesystem.java. closed source? me better understand issue. has experience writing spring-xd maprfs? or renaming files on maprfs spring-data-hadoop?

edit

i managed reproduce issue outside of spring xd simple test case (see below). note exception thrown if inwritingsuffix or inwritingprefix set. otherwise spring-hadoop not attempt rename file. still somehow unsatisfactory workaround me: refrain using inwritingprefixes , inwritingsuffixes.

@contextconfiguration("context.xml") @runwith(springjunit4classrunner.class) public class maprfssinktest {      @autowired      configuration configuration;      @autowired     filesystem filesystem;      @autowired      datastorewriter<string >storewriter;      @test     public void testrenameonmaprfs() throws ioexception, interruptedexception {         path testpath = new path("/tmp/foo.txt");         filesystem.delete(testpath, true);         textfilewriter writer = new textfilewriter(configuration, testpath, null);         writer.setinwritingsuffix("tmp");         writer.write("some entity");         writer.close();     }      @test     public void teststorewriter() throws ioexception {         this.storewriter.write("something");     }  } 

i created new branch spring-hadoop supports maprfs:

https://github.com/blinse/spring-hadoop/tree/origin/2.0.2.release-mapr

building release , using resulting jar works fine hdfs sink.


Popular posts from this blog