Xml Processing with MapReduce/Spark using an Xml StaX Parser

By | November 20, 2018

XmlStaxInputFormat / XmlStaxFileRecordReader Github Project – https://github.com/gss2002/xml-stax-mr

After some time it seemed like a gap that existed with Hadoop MapReduce and Spark that the existing XmlInputFormat classes from Mahout were using fseek and searching for strings as the file is read in from HDFS. The ability to break up a large Xml file becomes extremely important in Hadoop as it would be impossible to get a large multi-gigabyte Xml file into a DOM Object and to parse through it efficiently especially if the Xml is complex. The reasons behind this can be as simple as converting some Element or Attribute value from Celsius to Fahrenheit or to convert clear text value to a tokenized/encrypted value using MapReduce/Spark. So the long and short is I decided to create two new java classes as a MapReduce InputFormat and RecordReader: XmlStaxInputFormat.java and XmlStaxFileRecordReader.java. 

My specific use cases were un-structured xml tags inside a folder that need to be converted or tokenized to some other value for specific Xml Elements/Attributes. The way the XmlStaxInputFormat works is the splits are based on files in a folder and an xml_start_tag like in my example “METAR” determines beginning of a new record.

In the XmlStaxRecordReader.java I make sure to set the XMLInputFactory to COALESCING=TRUE as this will allow working with unstructured records. I also wrap the HDFS FSDataInputStream with BufferedInputStream to read the files from HDFS and than I wrap the Xml tags being read with a random parent tag by using an Array and passing it into SequenceInputStream to create a concatenatedInputStream in the initialize() method of XmlStaxRecordReader.

// Wrap up XML with temporary element for XML that may not be full structured and concat it
factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.TRUE);
List<InputStream> streams = Arrays.asList(new ByteArrayInputStream(“<wrapxmlrootup>”.getBytes()), bif,
new ByteArrayInputStream(“</wrapxmlrootup>”.getBytes()));
concatInputStream = new SequenceInputStream(Collections.enumeration(streams));

I than proceed to search for the start xmlElement tag in the XmlStaxRecordReader in the nextKeyValue() method and pass the dataset as a string off which can than be processed in FormatMapper class.

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// This breaks record up by search for the startTag Element and passes it to mapper
try {
while (reader.hasNext()) {
reader.next();
switch (reader.getEventType()) {
case XMLEvent.START_ELEMENT: {
//Grab the XML Element that starts with a specific tag and turn it into a string object and send it to mapper
if (reader.getLocalName().equalsIgnoreCase(startTag)) {
LOG.trace(“StartXMLKey: ” + reader.getLocalName());
Transformer transformer = TransformerFactory.newInstance().newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, “yes”);
StringWriter stringWriter = new StringWriter();
transformer.transform(new StAXSource(reader), new StreamResult(stringWriter));
value.set(stringWriter.toString());
LOG.trace(“XML Object: ” + stringWriter.toString());
key.set(fileOutput);
nodeCount++;
return true;
}
}
}

}
} catch (IllegalArgumentException | XMLStreamException | TransformerFactoryConfigurationError
| TransformerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}

 If there are any additional questions on how this works feel free to reach out via the github project issues page.