Debatching using the FF Disassembler - with multiple header/detail records

We have an incoming flat file which is a container for multiple messages. We need to debatch this FF as the size may be huge - over 30 MB, and feeding it to BizTalk may lead to Out of Memory exceptions since FFs blow up many times inside the engine as they are converted to XML.

I have a Flat File debatching question which I am trying to solve with the standard FF Disassembleras explained here

Inside the document container there are many mesages, each of which has one header and multiple detail lines in apositional FF schema as explained below:

This is our message structure:

Code Snippet
<documentContainer>
<message groupMaxOccurs='unbounded'>
<header maxOccurs=1>
<detail maxOccurs='unbounded'>

Sample 2 messages separated by one header record (each row - header or detail is fixed length)

Code Snippet

N000000618N 0000-6310-31 00340288412<--header

N000000618N FRS000875000003565862000066600DR<-- detail

N000000618N 0008750000035658620000010055WAST<-- detail

N000000618N APP001040443000000000000000000EA<-- detail

N000000618N 0000-0393-18 00340781732<--header

N000000618N FRS000875055003564632000066600TO<-- detail

I need to debatch the many messages inside this container on a message basis. I have created separate Header and Detail schemas which validate against the instances.

All the examples of FF debatching use one header record, but we have multiple messages which start with the header record.

I tried very hard to break up the input message into multiple messages (Header with its detail records), but no luck! When I try to break up the input message, the disassempler is able to understand the first occurence of the header record and non of other occurences.
This may be due to the fact that positional schemas are data agnostic,and so the parser is unable to recognize the beyond the first header row. But the separate Header and Detail schemas created validate against (a specially created message consisting of) detail lines only, and the header schema likewise against header lines only. They fail validation against the other type of message.

My test project to solve thisis here, if you want to play with it. It has everything you need including sample files, expected output and a readme.
thanks in advance for your help!
[3953 byte] By [CodeDigger] at [2008-1-8]
# 1
Not sure but:
could you change and try with schema
<message maxOccurs='unbounded'>...
Regards,
Leonid Ganeline
LeonidGaneline-MVP at 2007-10-2 > top of Msdn Tech,BizTalk Server,BizTalk R2 General...
# 2

OK I solved it, posting the code to split a large flat file so someone can benefit. The task above is now split into 2 pipeline components - one to split a large file and the second to custom disassemble it. Code below is for the File Splitter pipeline component. The custom disassemble is very custom, not sure someone may benefit from it, but the precedent is on CodeProject (Debatching by Extending FF Disassembler)

Code Snippet

namespace ACME.EAI.ProjectName.FileSplitter
{
using System;
using System.IO;
using System.Text;
using System.Drawing;
using System.Resources;
using System.Reflection;
using System.Diagnostics;
using System.Collections;
using System.Threading;
using System.ComponentModel;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;
using Microsoft.BizTalk.Component;
using Microsoft.BizTalk.Messaging;
using ACME.EAI.Utils;

[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[System.Runtime.InteropServices.Guid("f9a59fc1-f021-4c39-b997-023d1a63bb0e")]
[ComponentCategory(CategoryTypes.CATID_Decoder)]
public class ProjectName_FileSplitter : Microsoft.BizTalk.Component.Interop.IComponent, IBaseComponent, IPersistPropertyBag, IComponentUI
{
private const string HEADER_START_INDEX = "01";
private System.Resources.ResourceManager resourceManager = new System.Resources.ResourceManager("ACME.EAI.ProjectName.FileSplitter.ProjectName_FileSplitter", Assembly.GetExecutingAssembly());

private int _MessageNumber;
private int _FFLineByteLength;

public int FFLineByteLength
{
get
{
return _FFLineByteLength;
}
set
{
_FFLineByteLength = value;
}
}
public int MessageNumber
{
get
{
return _MessageNumber;
}
set
{
_MessageNumber = value;
}
}

#region IBaseComponent members
/// <summary>
/// Name of the component
/// </summary>
[Browsable(false)]
public string Name
{
get
{
return resourceManager.GetString("COMPONENTNAME", System.Globalization.CultureInfo.InvariantCulture);
}
}

/// <summary>
/// Version of the component
/// </summary>
[Browsable(false)]
public string Version
{
get
{
return resourceManager.GetString("COMPONENTVERSION", System.Globalization.CultureInfo.InvariantCulture);
}
}

/// <summary>
/// Description of the component
/// </summary>
[Browsable(false)]
public string Description
{
get
{
return resourceManager.GetString("COMPONENTDESCRIPTION", System.Globalization.CultureInfo.InvariantCulture);
}
}
#endregion

#region IPersistPropertyBag members
/// <summary>
/// Gets class ID of component for usage from unmanaged code.
/// </summary>
/// <param name="classid">
/// Class ID of the component
/// </param>
public void GetClassID(out System.Guid classid)
{
classid = new System.Guid("f9a59fc1-f021-4c39-b997-023d1a63bb0e");
}

/// <summary>
/// not implemented
/// </summary>
public void InitNew()
{
}

/// <summary>
/// Loads configuration properties for the component
/// </summary>
/// <param name="pb">Configuration property bag</param>
/// <param name="errlog">Error status</param>
public virtual void Load(Microsoft.BizTalk.Component.Interop.IPropertyBag pb, int errlog)
{
object val = null;
val = this.ReadPropertyBag(pb, "MessageNumber");
if ((val != null))
{
this._MessageNumber = ((int)(val));
}
else
{
this._MessageNumber = 1000; // default
}

val = this.ReadPropertyBag(pb, "FFLineByteLength");
if ((val != null))
{
this._FFLineByteLength = ((int)(val));
}
else
{
this._FFLineByteLength = 702; // default
}
}

/// <summary>
/// Saves the current component configuration into the property bag
/// </summary>
/// <param name="pb">Configuration property bag</param>
/// <param name="fClearDirty">not used</param>
/// <param name="fSaveAllProperties">not used</param>
public virtual void Save(Microsoft.BizTalk.Component.Interop.IPropertyBag pb, bool fClearDirty, bool fSaveAllProperties)
{
this.WritePropertyBag(pb, "MessageNumber", this.MessageNumber);
}

#region utility functionality
/// <summary>
/// Reads property value from property bag
/// </summary>
/// <param name="pb">Property bag</param>
/// <param name="propName">Name of property</param>
/// <returns>Value of the property</returns>
private object ReadPropertyBag(Microsoft.BizTalk.Component.Interop.IPropertyBag pb, string propName)
{
object val = null;
try
{
pb.Read(propName, out val, 0);
}
catch (System.ArgumentException )
{
return val;
}
catch (System.Exception e)
{
throw new System.ApplicationException(e.Message);
}
return val;
}

/// <summary>
/// Writes property values into a property bag.
/// </summary>
/// <param name="pb">Property bag.</param>
/// <param name="propName">Name of property.</param>
/// <param name="val">Value of property.</param>
private void WritePropertyBag(Microsoft.BizTalk.Component.Interop.IPropertyBag pb, string propName, object val)
{
try
{
pb.Write(propName, ref val);
}
catch (System.Exception e)
{
throw new System.ApplicationException(e.Message);
}
}
#endregion
#endregion

#region IComponentUI members
/// <summary>
/// Component icon to use in BizTalk Editor
/// </summary>
[Browsable(false)]
public IntPtr Icon
{
get
{
return ((System.Drawing.Bitmap)(this.resourceManager.GetObject("COMPONENTICON", System.Globalization.CultureInfo.InvariantCulture))).GetHicon();
}
}

/// <summary>
/// The Validate method is called by the BizTalk Editor during the build
/// of a BizTalk project.
/// </summary>
/// <param name="obj">An Object containing the configuration properties.</param>
/// <returns>The IEnumerator enables the caller to enumerate through a collection of strings containing error messages. These error messages appear as compiler error messages. To report successful property validation, the method should return an empty enumerator.</returns>
public System.Collections.IEnumerator Validate(object obj)
{
// example implementation:
// ArrayList errorList = new ArrayList();
// errorList.Add("This is a compiler error");
// return errorList.GetEnumerator();
return null;
}
#endregion

#region IComponent members
/// <summary>
/// Implements IComponent.Execute method.
/// </summary>
/// <param name="pc">Pipeline context</param>
/// <param name="inmsg">Input message</param>
/// <returns>Original input message</returns>
/// <remarks>
/// IComponent.Execute method is used to initiate
/// the processing of the message in this pipeline component.
/// </remarks>
public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(Microsoft.BizTalk.Component.Interop.IPipelineContext pc, Microsoft.BizTalk.Message.Interop.IBaseMessage inmsg)
{
try
{
int numMessagesInFile = this._MessageNumber;
int lineByteLength = this._FFLineByteLength;
bool bMessageStart = false;
int count = 0;
int messageCounter = -1; //as the first header count will increment it to 0
string line = string.Empty;
StringBuilder sb = new StringBuilder(null);
//create a temorary output folder which is also the receive location for a BizTalk recive port for the main disassemmbly
string outputFolder = Configuration.Settings.GetValue("Split_Out_Directory");

int totalMessageNumber = getTotalMessageNumber(inmsg.BodyPart.Data );

//if numMessagesInFile allowed is less than actual totalMessageNumber - no need to split, exit:
if (totalMessageNumber <= numMessagesInFile)
{
sb.Append(inmsg.BodyPart.Data);
WriteFile(sb, outputFolder + @"\<FileMaskPattern>-1.dat");
Logging.Log("File Splitter contained less than " + numMessagesInFile + " files. So it was not required to be split, and passed directly to receive location " + outputFolder, Configuration.Settings.GetValue("ProjectName.ErrorsCategorySource"), System.Diagnostics.TraceEventType.Information);
sb = null;
return inmsg;
}

//FileStream InputFileStream = new FileStream(strInputFileFullPath, FileMode.Open);
System.IO.StreamReader reader = new System.IO.StreamReader(inmsg.BodyPart.Data, Encoding.Default);

//read line by line
while ((line = reader.ReadLine()) != null)
{
//find the header match to identiy an individual message
if (line.StartsWith(HEADER_START_INDEX))
{
messageCounter++;
bMessageStart = true;
}
else
bMessageStart = false;

sb.Append(line + Environment.NewLine); //keep the number of read lines in the StringBuilder

if (messageCounter >= numMessagesInFile && (messageCounter % numMessagesInFile == 0) && bMessageStart == true) // condition for chunking
{
count++;
//remove the last line which is a header to write the 'previous' message
sb.Remove(sb.ToString().Length - lineByteLength, lineByteLength);

WriteFile(sb, outputFolder + @"\<FileMaskPattern>-" + count + ".dat");
sb.Remove(0, sb.ToString().Length);

// add back last line which is a header for the 'next' message
sb.Append(line + Environment.NewLine); //keep the number of read lines in the StringBuilder

if ((totalMessageNumber - messageCounter) <= numMessagesInFile)
{
sb.Append(reader.ReadToEnd());
WriteFile(sb, outputFolder + @"\<FileMaskPattern>-Last.dat");
sb.Remove(0, sb.ToString().Length);
}
}
}
reader.Close();
reader = null;
sb = null;

Logging.Log("File Splitter pipeline Split incoming file into: " + count-- + " files.", Configuration.Settings.GetValue("ProjectName.ErrorsCategorySource"), System.Diagnostics.TraceEventType.Information);

}
catch (Exception ex)
{
Logging.Log("File Splitter pipeline : " + ex.Message.ToString() + Environment.NewLine + ex.StackTrace.ToString(), Configuration.Settings.GetValue("ProjectName.ErrorsCategorySource"), System.Diagnostics.TraceEventType.Error);

}
return inmsg;
}

/// <summary>
/// Calculates the TotalMessageNumber in the input file independently of the Execute() method
/// </summary>
/// <param name="strInputFileFullPath"></param>
/// <returns></returns>
private int getTotalMessageNumber(Stream ffData)
{
System.IO.StreamReader reader = new System.IO.StreamReader(ffData, Encoding.Default);

string line;
int messageCounter = 0;
while ((line = reader.ReadLine()) != null)
{
if (line.StartsWith(HEADER_START_INDEX))
{
messageCounter++;
}
}
reader.Close();
reader = null;
return messageCounter;
}

private void WriteFile(StringBuilder sbData, string outFilePath)
{
FileStream OutputFileStream = null;
OutputFileStream = new FileStream(outFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None);
using (StreamWriter sw = new StreamWriter(OutputFileStream, Encoding.Default))
{
// Add some text to the file.
sw.Write(sbData.ToString());
sw.Flush();
Thread.Sleep(500);
sw.Close();
}
}

#endregion
}
}

CodeDigger at 2007-10-2 > top of Msdn Tech,BizTalk Server,BizTalk R2 General...