ConstrainQueueDepthDiscardTasks Behavior
I am having trouble fully understanding the task execution constraints behavior. Consider the following variation of the ThrottlingExample from the 1.5 documentation.
The documentation example exhibits the behavior that one might expect when pushing the limits of the machine (1 million posts in a loop), and it no doubt has it's purpose. In that case, cpu utilization can be expected to be high as is seen when running that example.
The following example might be a more realistic example as it might apply in some enterprise situations. In this case the throttle depth is set to 1 and messages are posted in bursts of 200 with a 2 second delay between bursts. In this situation, even though the message rate is extremely low (average of 100 per second), the cpu utilization is nearly 100 percent while executing the loop. Changing the msgs per burst or the delay between them does not substantially change the situation - the cpu's are pegged and other applications (non-ccr threads) are not happy.
On the other hand, using the normal dispatcher queue (no constraints) these message can easily be handled with extremely low cpu utilization.
The part that is even more confusing to me is that when the loop is complete, and while waiting an additional minute, the cpu utilization continues at nearly 100 percent until the dispatcher is disposed?
The behavior has changed since an earlier release where the constrained execution did not take up excessive cpu time.
Am I missing something here?
Thanks, Don
publicvoid ThrottlingExample(){
bool useConstrant =true;// CHANGE TO SWITCH BETWEEN DISPATCH QUEUES //bool useConstrant = false; int maximumDepth = 1; Dispatcher dispatcher =newDispatcher(0,"throttling example"); DispatcherQueue depthThrottledQueue; if (useConstrant) {
depthThrottledQueue =
newDispatcherQueue("ConstrainQueueDepthDiscard", dispatcher,
TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, maximumDepth);
}
else {
depthThrottledQueue =
newDispatcherQueue("ConstrainQueueDepthDiscard", dispatcher);
}
Port<int> intPort =newPort<int>(); Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) {
// only some items will be received since throttling will discard most of them Console.WriteLine(i); })
);
for (int i = 0; i < maximumDepth * 10000; i+= 200) {
for (int j = 0; j < 200; j++)// post some at a time {
intPort.Post(i + j);
}
System.Threading.
Thread.Sleep(2000); }
Console.WriteLine("All done"); System.Threading.
Thread.Sleep(60000);// wait a minute and watch cpu Console.WriteLine("Disposing"); depthThrottledQueue.Dispose();
dispatcher.Dispose();
}
everything you doing is reasonable and you should be seeing very low cpu utilization with or without depth constrained queues. The console.WriteLine API however is really not a good api to call for tests like this. It does do deep calls into native code -> OS, its synchronous and it might cause wierd scheduling behavior (since it will try exclsuive access to the console, but we are touching it from multiple threads).
I will try this example, but in the mean time, can you just do a tight loop in your handler (something that consumes CPU) and report on the behavior? I want to reduce the variables so we will be able to blame just the CCR 
thanx
g
Hi George,
Yes I'll try to do that right away. I initially noticed this behavior in a real test case we'd ran with earlier release where I have a service the receives WCF web service calls from winform clients and does scatter / gather of other WCF server's information. As part of the service it caches recent data from the other WCF server calls and returns that info immediately if it's recent enough for a given incoming request. If it's not recent enough, it keeps a (traditional) list of (waiting users) requests in it's _state, each having queued a Timer for the _waitingUserExpiredTimerPort, and it then queues a WCF request for the data if needed (keeping track of outstanding wcf request count). Then if any of these pending requests timeout, they fired to the common timeout port _waitingUserExpiredTimerPort that was on the constrained queue - that way my (single or few) events on that handler could go through and respond and remove any pending requests that were aged all in one operation (rather than going through my list for each timeout separately).
Arbiter.Activate(this.SingleDepthDiscardDispatchQueue, Arbiter.Receive<DateTime>(true, _waitingUserExpiredTimerPort,
delegate(DateTime waiterTimeout) {
//There may be multiple queued users that have posted timeouts..
//The special DispatcherQueue will only fire once
for a group //We will handle (remove) all of their DateTime msgs at once..
bool moreToRemove = true; DateTime outDt; do {
moreToRemove = _waitingUserExpiredTimerPort.Test(
out outDt); }
while (moreToRemove); // Now post a msg to the main interleave for actual processing in exclusive mode // (the exclusive handler goes thru the List<> and returns appropriate data to the pending user)
_removeTimedWaitingUsersPort.Post(
new RemoveTimedWaitingUsers()); }));
In that case, I posted back on my _removeTimedWaitingUsersPort to an exclusive interleave handler to handle and remove those requests from the List<> and respond with any available gathered data, and/or error info from servers it had no gathered data for. All seemed like it was very responsive, etc, with earlier release.
So the constrained dispatcher queue and port were really used to (combine) multiple timer posts into a common post to the exclusive handler so as to minimize the number of times the exclusive handler was hit.
The only other access to the _waitingUserExpiredTimePort (and indirectly to the special constrained queue) was that when the outstanding (outgoing) WCF proxy call completed asynchronously I also posted back to an exclusive main interleave port to update my _state variable to cache the new information, and within that exclusive handler I called the "updateHelper" method that once more cleaned out any items in the port (probably was not necessary).
The test case actually should not have stressed the system at all - generally we might have sent at most a handful of incoming WCF requests to the service while the outgoing scatter / gather was happening, so the List<> would have had only that handful of messages as well as only a handful of timeout posts. (I did not try to cancel the timeout post to the port in the case where the outgoing WCF proxy had returned before the request timed out - but instead let the Timer fire to the port even though it's associated request it was no longer in my List<>
(_waitingUsers is my List<> that's holding those original request cmds that came in, plus a little context info)
// [called from Exclusive context] private int updateHelper() {
bool moreToRemove = true; DateTime outDt; do // must clean up timer items. {
moreToRemove = _waitingUserExpiredTimerPort.Test(
out outDt); }
while (moreToRemove); int updateCount = 0; foreach (VoIPServiceInfoWaitingUser waitingUser in _state._waitingUsers) {
waitingUser._originalRequest.ResponsePort.Post(_state._info);
// post back to user updateCount++;
}
_state._waitingUsers.Clear();
// clear up all waiting users, since we've handled them. return updateCount; }
So I don't see that there is anything in particular that should be causing blocking or context switching during the handler itself, unless I'm misunderstanding something. Nor would the access to the port from within the above method, correct?
I did just happen to think that when I queue the Timer to the port, I'm doing that on the non-constrained dispatcher queue, but this should be ok I think? Below is the handler I post to (exclusive interleave handler) from my concurrent incoming request - (only in the case where I don't have recent enough cached data, and need to track the request)
// [Exclusive] protected IEnumerator<ITask> VoIPMgmtServiceInfoWaitingUserHandler(VoIPServiceInfoWaitingUser addWaitingUserCmd) {
_state._waitingUsers.Add(addWaitingUserCmd);
// this user needs notified on new data, or needs timed out TimeSpan ts = FstCcr.AsyncHelper.UTCTimeSpan(addWaitingUserCmd._originalRequest.Body._expiresTimeUtc); this.TaskQueue.EnqueueTimer(ts, _waitingUserExpiredTimerPort); // to kick off timeout handler _state.addWaitingUserStat(); // just updates stats
// we need to ensure that we have something in progress to get us more current data queueWcfRequestForInfo(); // makes async WCF outgoing call if needed
yield break; }
Sorry to bore you with the details of the actual use case, but wanted to give background
In that particular scenario with the 1.5 release, the whole server machine pretty much melted down (worse symptoms than this example) to where it was difficult to get things stopped. Same behavior on duo core machines running XP and 8 core servers running Windows 2003..
I believe I have a sample I could send (can't post here) that has abstracted away all of the WCF stuff with dummy async handlers, etc, so it pretty much simulates the whole thing, but uses an underlying custom service base class.
I'll make the changes you suggest to the simple test as well and will let you know.
Thanks, Don
George,
With this code replacing the delegate in the original sample, so as to use only math and rand method (Console WriteLine commented out) -
when running without constraint -> 0 to 2 percent cpu utilization
when with constaint -> 93 to 99 percent cpu utilization and continued to use 98% after the "All done" was printed for the next minute(up until the Dispose() method called)
Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) {
// only some items will be received since throttling will discard most of them //Console.WriteLine(i); Random rand = new Random(); double x = 2; for (int k = 0; k < 1000; k++) {
double d = rand.NextDouble(); x = x * d;
}
})
);
//--
With this delegate replacing one in original post (no statements at all in the delegate handler)
when running without constraint -> 0 percent cpu utilization (1 second cpu time over whole run)
when running with constraint -> 93 to 99 percent cpu and continued to use 98 % cpu after "All Done" was printed for one minute, then cleaned up as expected with Dispose method call.
Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) {
// only some items will be received since throttling will discard most of them //Console.WriteLine(i); //Random rand = new Random(); //double x = 2; //for (int k = 0; k < 1000; k++) //{ // double d = rand.NextDouble(); // x = x * d; //} })
);
These tests ran on duo core XP SP2
Thanks,
Don
ok i will try this simple repro then. ConstraintWithRate exhibits similar behavior for you?
I have not tried that - I won't be able to run that test til next week - have production issues in front of me today.
ok i tried your scenario and i can see the problem. For some reason, the sleep between the int posts, causes the scheduler to spin, if the throttling is enabled.
I ll debug it further and narrow it down. However you have a couple of alternatives if this is blocking you:
1) Use a one time receiver. Each time your delegate runs, use port.Test() to remove all old items
2) at the end of yur receiver -reactivate a one-time receive.
this will enable you to throttle with minima perf difference over specifying queue policy
Update:
I found the bug. The DispatcherQueue failed to reduce the count of pending items on its parent dispatcher, thus causing the worker threads to spin more than they should. This is not exposed due to the inherent non determinism when scheduling across lots of threads, or alot of items back to back. But you repro nicely exposed it 
Let us know how critical this is. It might justify a CCR 1.5 refresh (the version will be the same so no one will need to recompile any code they have written etc)
g
Thanks George,
yes, that's what I'd done to minimize the number of handler occurances in the real test scenario when using the non-constrained dispatch queue.
I'll try to run the test against the frequency based constraint next week and let you know, although I suspect you'll have a good clue about that issue once you get in there..
I do have a future feature request regardling scheduling (although I haven't fully thought it out):
We have several scenarios where we use specific "network traffic packet" type classes (not in CCR yet) that we have pre-allocated into pools to minimize GC overhead, etc, and have the async network receives use those packets - it would be very convenient in some cases to use a frequency based (or depth based) queue to throttle that traffic and have it deliver only the last N packets, exactly as you've designed it -- however, to have the dispatcher post those packets that were "discarded" as being "too old" to an alternative port where we could free those messages (which are the packets object instances) back to our pool of packets.
It may be more plausible to have the application do the same with one time receivers (doesn't really seem right when you're probably interested in either frequency based on last 50 packets or similar), but having the dispatcher do it seems quite convenient and seems like it could do it much more efficiently than waiting for the normal handler to dig through them. Would free the UPD or TCP receiver from worrying about filtering before dispatching..
Thanks for the quick response - keep up the great work! I'm interested in what you find!
Don
Ok, yes thanks George and great work finding it so quickly - I wish I had been able to isolate it to the constrained queue as quickly
(Do you have any suggestions in such a situation to determine where things are spinning? I could tell that it wasn't in my user code and could tell it was a dispatcher thread as freezing them finally would nail it..)
It would be nice to have a refresh to resolve this issue - we have a number of scenarios where we have and /or were planning on this same pattern of use.
Thanks again,
Don
A refresh is possible, or a private drop (with still the 1.5 version so no need for rebuild) to your company sooner if you are willing to sign an NDA. I assume you have purchased a commercial license anyway. Let us know.
Another alternative is to rely on the work around, although not as convenient.
The CCR will not spin, unless there is a bug, so there should be no need to determine if it spins
One hint might be the PendingItemcount on the Dispatcher, if that never seems to go to zero...
g
yes this is a request that makes sense and we have found a need for it too. A port attached to the dispatcherQueue, could receive all Tasks that where discarded for example...
g
Yes, we would appreciate either the refresh or a private drop, depending on timing. Yes, Firesteel Technologies has commercial licenses are are willing to sign an NDA. If the refresh would be more than a few weeks out, a private drop would be preferrable.
Thanks George - really appreciate the great work you and your team are doing!
Don
Ok thank you. We are planning a QFE package with the updated CCR dll in a few weeks, that we will make available to everyone. If it takes more than a couple of weeks to release, we can contact you directly for a private drop.
thanx
g