Wednesday, 19 August 2015

An extended Queue interface

In my work on JCTools I have implemented a fair number of concurrent access queues. The Queue interface is part of the java.util package and offers a larger API surface area than I found core to concurrent message passing on the one hand, and still missing others. I'm hoping to solicit some discussion on some new methods, and see if I can be convinced to implement those I decided to avoid thus far.

Inter-thread Message Passing vs. Generic Collection

The queue data structure traditionally has an offer/push/enqueue method for writing into the queue and a poll/pop/dequeue method for removing elements. This should be enough for anyone, right?
Turns out some other useful methods come to mind when using a queue:
  • peek: looking, not touching
  • size: for monitoring and heuristics and so on
  • isEmpty: a potentially more efficient assertion than size == 0
But I would argue that implementing the full scope of the java.util.Collection methods is slightly redundant. Why is it good to have offer/poll also exposed as add/remove/element?
I have chosen to avoid implementing iterators for queues in JCTools, they seem to introduce a large complexity and I'm not convinced users out there need an iterator for their concurrent queue. The duplicate collection methods I get from extending AbstractQueue for free are alright though I don't see why anyone would actually use them.
So lots of methods I don't want, but also some methods missing, in particular for concurrent queue semantics. In particular I have come to the conclusion the following are both practical to implement and helpful to users:
  • weakOffer/Poll/Peek: the same as the 'strong' methods but without the requirement that allowing for spurious negative return value. This is particularly useful for queues where we are able to detect the inability to claim the next slot independently of the queue being full or empty. The 'weak' prefix is different from the Atomic* classes definition of 'weak' as it has no ordering implications, but I've been unable to come up with a better name(suggestions are most welcome).
  • drain(receiver)/fill(provider): this is attractive for cases where a receiver/provider function exists such that calls to it are guaranteed to succeed (e.g. receiver could just throw elements away, producer might return a constant). There are some small savings to be made here around the function calling loop, but for some these small savings might prove significant. A perpetual version of these methods could also take some sort of backoff strategy and spin when hitting an empty/full condition (similar to the Disruptor's BatchProcessor workflow).
  • batchPoll/Offer: this is attractive for processing known number of elements out of/into a queue. This is particularly profitable when multiple consumer/producers contend as the granularity of contention is reduced. It is also potentially damaging as batches can introduce large 'bubbles' into the queue and increase the unfairness. These methods could support a receiver/provider interface or use an array (enables batch array copies, guarantees quick release of a 'bubble').
  • offeredElementsCount/polledElementsCount: this is purely for reporting and would tend to be a long reflecting the producer/consumer counter. For linked queues this adds a new overhead, so while this is a clearly useful feature I'm reluctant to make it a compulsory part of the new interface.

I plan to implement the above for JCTools in the near future. What do you think? Please opine in the comments or via the projects suggestion box.
Thanks :)

3 comments:

  1. weakOffer/Poll/Peek => relaxedOffer/Poll/Peek
    But maybe still confusing... not convinced myself anyway :p

    ReplyDelete
    Replies
    1. 'relaxed' and 'weak' both have memory ordering connotations, but relaxed is perhaps a more fitting description for relaxing the requirement.
      I want to stick with the Offer/Poll/Peek + prefix, but it would be good to find a fitting prefix...

      Delete
  2. https://github.com/real-logic/Agrona/blob/master/src/main/java/uk/co/real_logic/agrona/concurrent/QueuedPipe.java and https://github.com/real-logic/Agrona/blob/master/src/main/java/uk/co/real_logic/agrona/concurrent/Pipe.java might be of relevance here.

    ReplyDelete