Using DSO for parallel partition processing
Each thread is creating a separate connection to the cube. The object below is created with the first "process" method as the entry point for the thread.
using System;using System.Collections;using System.Threading;using DSO;using StarComponents;namespace CubeProcessing{ publicclass PartitionProcessor{ private Stack partitions;private Queue loggingMessages;public PartitionProcessor(Stack partitions, Queue loggingMessages){ this.partitions = partitions;this.loggingMessages = loggingMessages;} publicvoid process(){ Server olapServer = new DSO.ServerClass();Database olapDatabase; MDStore cube = null;try{ string olapServerName = Constants.OLAP_SERVER;string olapDatabaseName = Constants.OLAPDB;olapServer.Connect(olapServerName); if(olapServer.State == DSO.ServerStates.stateFailed)thrownew CubeProcessingApplicationException("Failed to connect to the olap server");if(olapServer.MDStores.Find(olapDatabaseName))olapDatabase = (DSO.Database) olapServer.MDStores.Item(olapDatabaseName); elsethrownew CubeProcessingApplicationException("Failed to get the database");while (partitions.Count > 0){ PartitionProcessInfo process = null;MDStore cubePartition = null;string originalFilter =null;try{ process = (PartitionProcessInfo) partitions.Pop(); } catch(InvalidOperationException){ continue;} process.StartTime = DateTime.Now; loggingMessages.Enqueue(process.PartitionName + " - started processing (" + process.ProcessType + ")"); try{ cube = (DSO.MDStore) olapDatabase.MDStores.Item(process.CubeName); cubePartition = (DSO.MDStore) cube.MDStores.Item(process.PartitionName); } catch{ loggingMessages.Enqueue(process.PartitionName + "Couldn't find partitionin " + process.CubeName); continue;} if (cubePartition.State == OlapStateTypes.olapStateNeverProcessed)process.ProcessType = ProcessTypes.processFull; originalFilter = cubePartition.SourceTableFilter; if (process.IsBEPartition){ CubeRefresher.setBEPartitionFilterForFullProcess(process.MeasurementPeriodKey, process.BEId, process.LoadId, cubePartition); this.process(olapDatabase, cube, cubePartition, process, originalFilter);//cubePartition.Process(process.ProcessType);CubeRefresher.setBEPartitionFilter(process.MeasurementPeriodKey, process.BEId, cubePartition); } else{ CubeRefresher.setCurrentMeasurementPeriodPartitionFilterForFullProcess( process.MeasurementPeriodKey, process.BEId, process.LoadId, cubePartition); this.process(olapDatabase, cube, cubePartition, process, originalFilter);//cubePartition.Process(process.ProcessType);CubeRefresher.setCurrentMeasurementPeriodPartitionFilter(process.MeasurementPeriodKey, process.BEId, cubePartition); } loggingMessages.Enqueue(cubePartition.Name + " - set filter for partition to " + cubePartition.SourceTableFilter); if (!cubePartition.IsValid)thrownew Exception(" Cube Partition Process failed for " +process.PartitionName); CubeProcessingDAO.setCubeProcessStatus(process.JobId, process.PartitionName, CubeProcessingConstants.CUBE_PROCESS_STATUS_COMPLETED); loggingMessages.Enqueue(process.PartitionName + " - finished processing (" + process.ProcessType + ")"); } } catch (Exception e){ loggingMessages.Enqueue("ERROR - Caught exception while processing partitions: " + e.Message); } finally{ olapServer.CloseServer(); } } protectedvoid process(Database olapDatabase, MDStore cube, MDStore partition, PartitionProcessInfo process,string originalFilter){ if (process.IsIncremental)processIncremental(olapDatabase, cube, partition, process, originalFilter); elsepartition.Process(process.ProcessType); } protectedvoid processIncremental(Database olapDatabase, MDStore cube, MDStore partition, PartitionProcessInfo process,string originalFilter){ DSO.MDStore tempCubePartition = (DSO.MDStore) cube.MDStores.AddNew("~" + partition.Name, partition.SubClassType ); partition.Clone(tempCubePartition, DSO.CloneOptions.cloneMinorChildren); tempCubePartition.SourceTableFilter = originalFilter; tempCubePartition.Update(); addDataToTheCubeWithLoadID(tempCubePartition, process.LoadId.ToString()); olapDatabase.Process(CubeProcessingConstants.OlapDatabaseSuspendLazyProcess); partition.Merge(tempCubePartition.Name); olapDatabase.Process(CubeProcessingConstants.OlapDatabaseResumeLazyProcess); partition.SourceTableFilter = originalFilter; partition.Update(); } privatevoid addDataToTheCubeWithLoadID(DSO.MDStore cubePartition,string loadId){ string originalFilter = cubePartition.SourceTableFilter;if (originalFilter.IndexOf("loadId") > 0 ){ originalFilter = CubeProcessingUtil.removeLoadIdFromTheFilter(originalFilter); loggingMessages.Enqueue(cubePartition.Name + " - originalFilter: " + originalFilter); } cubePartition.SourceTableFilter = originalFilter + " and " + cubePartition.SourceTable + "." + "\"loadId\"" + "=" + loadId ; cubePartition.Update(); if (!cubePartition.IsValid)thrownew Exception("Set Source Table Filter to loadId failed for" + cubePartition.Name);cubePartition.Process(DSO.ProcessTypes.processFull); if (!cubePartition.IsValid)thrownew Exception("Cube Partition Process failed for " + cubePartition.Name);cubePartition.SourceTableFilter = originalFilter; cubePartition.Update(); if (!cubePartition.IsValid)thrownew Exception("Failed to reset Source Table Filter to nothing" + cubePartition.Name);} } } |
(Sorry for the poor formatting - I'm not sure why the cut & paste lost the indentation)
Here is the code that starts the threads;
int numParallelTasks = Constants.NUMBER_OF_PARALLEL_CUBE_PROCESSING_TASKS; ArrayList threads = new ArrayList();for (int i = 0; i < numParallelTasks; i++){ PartitionProcessor proc = Thread thread = new Thread(new ThreadStart(proc.process));thread.Start(); threads.Add(thread); } while (threadsAreAlive(threads)){ Thread.Sleep(0); flushPendingMessages(loggingMessages); } |
Any ideas why perfmon would only indicate 1 partition being processed at a time?

