Using DSO for parallel partition processing

I have written some code to process partitions in parallel, but I'm not seeing the performance improvements I was expecting. Plus, perfmon indicates that only 1 partition is being processed at any given moment.

Each thread is creating a separate connection to the cube. The object below is created with the first "process" method as the entry point for the thread.


using System;

using System.Collections;

using System.Threading;

using DSO;

using StarComponents;

namespace CubeProcessing

{

publicclass PartitionProcessor

{

private Stack partitions;

private Queue loggingMessages;

public PartitionProcessor(Stack partitions, Queue loggingMessages)

{

this.partitions = partitions;

this.loggingMessages = loggingMessages;

}

publicvoid process()

{

Server olapServer =new DSO.ServerClass();

Database olapDatabase;

MDStore cube =null;

try

{

string olapServerName = Constants.OLAP_SERVER;

string olapDatabaseName = Constants.OLAPDB;

olapServer.Connect(olapServerName);

if(olapServer.State == DSO.ServerStates.stateFailed)

thrownew CubeProcessingApplicationException("Failed to connect to the olap server");

if(olapServer.MDStores.Find(olapDatabaseName))

olapDatabase = (DSO.Database) olapServer.MDStores.Item(olapDatabaseName);

else

thrownew CubeProcessingApplicationException("Failed to get the database");

while (partitions.Count > 0)

{

PartitionProcessInfo process =null;

MDStore cubePartition =null;

string originalFilter =null;

try

{

process = (PartitionProcessInfo) partitions.Pop();

}

catch(InvalidOperationException)

{

continue;

}

process.StartTime = DateTime.Now;

loggingMessages.Enqueue(process.PartitionName + " - started processing (" + process.ProcessType + ")");

try

{

cube = (DSO.MDStore) olapDatabase.MDStores.Item(process.CubeName);

cubePartition = (DSO.MDStore) cube.MDStores.Item(process.PartitionName);

}

catch

{

loggingMessages.Enqueue(process.PartitionName + "Couldn't find partitionin " + process.CubeName);

continue;

}

if (cubePartition.State == OlapStateTypes.olapStateNeverProcessed)

process.ProcessType = ProcessTypes.processFull;

originalFilter = cubePartition.SourceTableFilter;

if (process.IsBEPartition)

{

CubeRefresher.setBEPartitionFilterForFullProcess(process.MeasurementPeriodKey,

process.BEId, process.LoadId, cubePartition);

this.process(olapDatabase, cube, cubePartition, process, originalFilter);

//cubePartition.Process(process.ProcessType);

CubeRefresher.setBEPartitionFilter(process.MeasurementPeriodKey, process.BEId,

cubePartition);

}

else

{

CubeRefresher.setCurrentMeasurementPeriodPartitionFilterForFullProcess(

process.MeasurementPeriodKey, process.BEId, process.LoadId, cubePartition);

this.process(olapDatabase, cube, cubePartition, process, originalFilter);

//cubePartition.Process(process.ProcessType);

CubeRefresher.setCurrentMeasurementPeriodPartitionFilter(process.MeasurementPeriodKey,

process.BEId, cubePartition);

}

loggingMessages.Enqueue(cubePartition.Name + " - set filter for partition to " + cubePartition.SourceTableFilter);

if (!cubePartition.IsValid)thrownew Exception(" Cube Partition Process failed for " +

process.PartitionName);

CubeProcessingDAO.setCubeProcessStatus(process.JobId, process.PartitionName,

CubeProcessingConstants.CUBE_PROCESS_STATUS_COMPLETED);

loggingMessages.Enqueue(process.PartitionName + " - finished processing (" + process.ProcessType + ")");

}

}

catch (Exception e)

{

loggingMessages.Enqueue("ERROR - Caught exception while processing partitions: " + e.Message);

}

finally

{

olapServer.CloseServer();

}

}

protectedvoid process(Database olapDatabase, MDStore cube, MDStore partition, PartitionProcessInfo process,string originalFilter)

{

if (process.IsIncremental)

processIncremental(olapDatabase, cube, partition, process, originalFilter);

else

partition.Process(process.ProcessType);

}

protectedvoid processIncremental(Database olapDatabase, MDStore cube, MDStore partition, PartitionProcessInfo process,string originalFilter)

{

DSO.MDStore tempCubePartition = (DSO.MDStore) cube.MDStores.AddNew("~" + partition.Name, partition.SubClassType );

partition.Clone(tempCubePartition, DSO.CloneOptions.cloneMinorChildren);

tempCubePartition.SourceTableFilter = originalFilter;

tempCubePartition.Update();

addDataToTheCubeWithLoadID(tempCubePartition, process.LoadId.ToString());

olapDatabase.Process(CubeProcessingConstants.OlapDatabaseSuspendLazyProcess);

partition.Merge(tempCubePartition.Name);

olapDatabase.Process(CubeProcessingConstants.OlapDatabaseResumeLazyProcess);

partition.SourceTableFilter = originalFilter;

partition.Update();

}

privatevoid addDataToTheCubeWithLoadID(DSO.MDStore cubePartition,string loadId)

{

string originalFilter = cubePartition.SourceTableFilter;

if (originalFilter.IndexOf("loadId") > 0 )

{

originalFilter = CubeProcessingUtil.removeLoadIdFromTheFilter(originalFilter);

loggingMessages.Enqueue(cubePartition.Name + " - originalFilter: " + originalFilter);

}

cubePartition.SourceTableFilter = originalFilter + " and " + cubePartition.SourceTable + "." + "\"loadId\"" + "=" + loadId ;

cubePartition.Update();

if (!cubePartition.IsValid)thrownew Exception("Set Source Table Filter to loadId failed for" + cubePartition.Name);

cubePartition.Process(DSO.ProcessTypes.processFull);

if (!cubePartition.IsValid)thrownew Exception("Cube Partition Process failed for " + cubePartition.Name);

cubePartition.SourceTableFilter = originalFilter;

cubePartition.Update();

if (!cubePartition.IsValid)thrownew Exception("Failed to reset Source Table Filter to nothing" + cubePartition.Name);

}

}

}


(Sorry for the poor formatting - I'm not sure why the cut & paste lost the indentation)

Here is the code that starts the threads;

int numParallelTasks = Constants.NUMBER_OF_PARALLEL_CUBE_PROCESSING_TASKS;

ArrayList threads =new ArrayList();

for (int i = 0; i < numParallelTasks; i++)

{

PartitionProcessor proc =
new PartitionProcessor(partitionsToProcess, loggingMessages);

Thread thread =new Thread(new ThreadStart(proc.process));

thread.Start();

threads.Add(thread);

}

while (threadsAreAlive(threads))

{

Thread.Sleep(0);

flushPendingMessages(loggingMessages);

}


Any ideas why perfmon would only indicate 1 partition being processed at a time?

[13874 byte] By [ScottGunn] at [2008-2-10]
# 1
Scott,

Haven't had time to trace through all of your code, but have you looked at the parallel processing utility code written by Dave Wickert? You can find it at the following download location:

http://www.microsoft.com/downloads/details.aspx?FamilyId=A2EEF773-6DF7-4688-8211-E02CF13CBDB4&displaylang=en

Taking a look at his code (written in VB6 I believe) might give you some clues...

Dave Fackler

DaveFackler at 2007-9-9 > top of Msdn Tech,SQL Server,SQL Server Analysis Services...
# 2
Yes, I have looked at it for clues. I can't find anything - the only big difference I found is that each partition is being processed by a separate process instead of just a separate thread.

Also, I have some extra logic in each thread that changes the SourceTableFilter properties of each partition and calls Update(). Perhaps the Analysis Server is blocking on some of these calls while processing is happening on other partitions?

ScottGunn at 2007-9-9 > top of Msdn Tech,SQL Server,SQL Server Analysis Services...

SQL Server

Site Classified