Tag - multiprocessing

Thread safe object pool to manage scarce resource in application servers

By Stephane Carrez

Problem Description

In application servers some resources are expensive and they must be shared. This is the case for a database connection, a frame buffer used for image processing, a connection to a remote server, and so on. The problem is to make available these scarce resources in such a way that:

  • a resource is used by only one thread at a time,
  • we can control the maximum number of resources used at a time,
  • we have some flexibility to define such maximum when configuring the application server,
  • and of course the final solution is thread safe.

The common pattern used in such situation is to use a thread-safe pool of objects. Objects are picked from the pool when needed and restored back to the pool when they are no longer used.

Java thread safe object pool

Let's see how to implement our object pool in Java. We will use a generic class declaration into which we define a fixed array of objects. The pool array is allocated by the constructor and we will assume it will never change (hence the final keyword).

public class Pool<T> {
  private final T[] objects;
 
  public Pool<T>(int size) {
     objects = new T[size];
  }
  ...
}

First, we need a getInstance method that picks an object from the pool. The method must be thread safe and it is protected by the synchronized keyword. It there is no object, it has to wait until an object is available. For this, the method invokes wait to sleep until another thread releases an object. To keep track of the number of available objects, we will use an available counter that is decremented each time an object is used.

  private int available = 0;
  private int waiting = 0;
  public synchronized T getInstance() {
     while (available == 0) {
        waiting++;
        wait();
        waiting--;
     }
     available--;
     return objects[available];
  }

To know when to wakeup a thread, we keep track of the number of waiters in the waiting counter. A loop is also necessary to make sure we have an available object after being wakeup. Indeed, there is no guarantee that after being notified, we have an available object to return. The call to wait will release the lock on the pool and puts the thread is wait mode.

Releasing the object is provided by release. The object is put backed in the pool array and the available counter incremented. If some threads are waiting, one of them is awaken by calling notify.

  public synchronized void release(T obj) {
     objects[available] = obj;
     available++;
     if (waiting) {
        notify();
     }
  }

When the application is started, the pool is initialized and some pre-defined objects are inserted.

   class Item { ... };
...
   Pool<Item> pool = new Pool<Item>(10);
   for (int i = 0; i < 10; i++) {
       pool.release(new Item());
   }

Ada thread safe pool

The Ada object pool will be defined in a generic package and we will use a protected type. The protected type will guarantee the thread safe behavior of the implementation by making sure that only one thread executes the procedures.

generic
   type Element_Type is private;
package Util.Concurrent.Pools is
     type Element_Array_Access is private;
     Null_Element_Array : constant Element_Array_Access;
  ...
private
    type Element_Array is array (Positive range <>) of Element_Type;
    type Element_Array_Access is access all Element_Array;
     Null_Element_Array : constant Element_Array_Access := null;
end Util.Concurrent.Pools;   

The Ada protected type is simple with three procedures, we get the Get_Instance and Release as in the Java implementation. The Set_Size will take care of allocating the pool array (a job done by the Java pool constructor).

protected type Pool is
  entry Get_Instance (Item : out Element_Type);
  procedure Release (Item : in Element_Type);
  procedure Set_Size (Capacity : in Positive);
private
  Available     : Natural := 0;
  Elements      : Element_Array_Access := Null_Element_Array;
end Pool;

First, the Get_Instance procedure is defined as an entry so that we can define a condition to enter in it. Indeed, we need at least one object in the pool. Since we keep track of the number of available objects, we will use it as the entry condition. Thanks to this entry condition, the Ada implementation is a lot easier.

protected body Pool is
  entry Get_Instance (Item : out Element_Type) when Available > 0 is
  begin
     Item := Elements (Available);
     Available := Available - 1;
  end Get_Instance;
...
end Pool;

The Release operation is also easier as there is no need to wakeup any thread: the Ada runtime will do that for us.

protected body Pool is
   procedure Release (Item : in Element_Type) is
   begin
      Available := Available + 1;
      Elements (Available) := Item;
   end Release;
end Pool;

The pool is instantiated:

type Connection is ...;
package Connection_Pool is new Util.Concurrent.Pools (Connection);

And a pool object can be declared and initialized with some default object:

P : Connection_Pool.Pool;
C : Connection;
...
   P.Set_Size (Capacity => 10);
   for I in 1 .. 10 loop
         ...
         P.Release (C);
   end loop;

References

util-concurrent-pools.ads
util-concurrent-pools.adb

To add a comment, you must be connected. Login to add a comment

Thread safe cache updates in Java and Ada

By Stephane Carrez 2 comments

Problem Description

The problem is to update a cache that is almost never modified and only read in multi-threaded context. The read performance is critical and the goal is to reduce the thread contention as much as possible to obtain a fast and non-blocking path when reading the cache.

Cache Declaration

Java Implementation

Let's define the cache using the HashMap class.

public class Cache {
   private HashMap<String,String> map = new HashMap<String, String>();
}

Ada Implementation

In Ada, let's instantiate the Indefinite_Hashed_Maps package for the cache.

with Ada.Strings.Hash;
with Ada.Containers.Indefinite_Hashed_Maps;
...
  package Hash_Map is
    new Ada.Containers.Indefinite_Hashed_Maps (Key_Type => String,
                       Element_Type => String,
                       Hash => Hash,
                       "=" => "=");

  Map : Hash_Map.Map;

Solution 1: safe and concurrent implementation

This solution is a straightforward solution using the language thread safe constructs. In Java this solution does not allow several threads to look at the cache at the same time. The cache access will be serialized. This is not a problem with Ada, where multiple concurrent readers are allowed. Only writing locks the cache object

Java Implementation

The thread safe implementation is protected by the synchronized keyword. It guarantees mutual exclusions of threads invoking the getCache and addCache methods.

   public synchronized String getCache(String key) {
      return map.get(key);
   }
   public synchronized void addCache(String key, String value) {
      map.put(key, value);
   }

Ada Implementation

In Ada, we can use the protected type. The cache could be declared as follows:

  protected type Cache is
    function Get(Key : in String) return String;
    procedure Put(Key, Value: in String);
  private
    Map : Hash_Map.Map;
  end Cache;

and the implementation is straightforward:

  protected body Cache is
    function Get(Key : in String) return String is
    begin
       return Map.Element (Key);
    end Get;
    procedure Put(Key, Value: in String) is
    begin
       Map.Insert (Key, Value);
    end Put;
  end Cache;

Pros and Cons

+: This implementation is thread safe.

-: In Java, thread contention is high as only one thread can look in the cache at a time.

-: In Ada, thread contention occurs only if another thread updates the cache (which is far better than Java but could be annoying for realtime performance if the Put operation takes time).

-: Thread contention is high as only one thread can look in the cache at a time.

Solution 2: weak but efficient implementation

The Solution 1 does not allow multiple threads to access the cache at the same time, thus providing a contention point. The second solution proposed here, removes this contention point by relaxing some thread safety condition at the expense of cache behavior.

In this second solution, several threads can read the cache at the same time. The cache can be updated by one or several threads but the update does not guarantee that all entries added will be present in the cache. In other words, if two threads update the cache at the same time, the updated cache will contain only one of the new entry. This behavior can be acceptable in some cases and it may not fit for all uses. It must be used with great care.

Java Implementation

A cache entry can be added in a thread-safe manner using the following code:

   private volatile HashMap<String, String> map = new HashMap<String, String>();
   public String getCache(String key) {
      return map.get(key);
   }
   public void addCache(String key, String value) {
      HashMap<String, String> newMap = new HashMap<String, String>(map);

      newMap.put(newKey, newValue);
      map = newMap;
   }

This implementation is thread safe because the hash map is never modified. If a modification is made, it is done on a separate hash map object. The new hash map is then installed by the map = newMap assignment operation which is atomic. Again this code extract does not guarantee that all the cache entries added will be part of the cache.

Ada Implementation

The Ada implementation is slightly more complex basically because there is no garbage collector. If we allocate a new hash map and update the access pointer, we still have to free the old hash map when no other thread is accessing it.

The first step is to use a reference counter to automatically release the hash table when the last thread finishes its work. The reference counter will handle memory management issues for us. An implementation of thread-safe reference counter is provided by Ada Util. In this implementation, counters are updated using specific instruction (See Showing multiprocessor issue when updating a shared counter).

with Util.Refs;
...
   type Cache is new Util.Refs.Ref_Entity with record
      Map : Hash_Map.Map;
   end record;
   type Cache_Access is access all Cache;

   package Cache_Ref is new Util.Refs.References (Element_Type => Cache,
                Element_Access => Cache_Access);

  C : Cache_Ref.Atomic_Ref;

Source: Util.Refs.ads, Util.Refs.adb

The References package defines a Ref type representing the reference to a Cache instance. To be able to replace a reference by another one in an atomic manner, it is necessary to use the Atomic_Ref type. This is necessary because the Ada assignment of an Ref type is not atomic (the assignment copy and the call to the Adjust operation to update the reference counter are not atomic). The Atomic_Ref type is a protected type that provides a getter and a setter. Their use guarantees the atomicity.

    function Get(Key : in String) return String is
      R : constant Cache_Ref.Ref := C.Get;
    begin
       return R.Value.Map.Element (Key); -- concurrent access
    end Get;
    procedure Put(Key, Value: in String) is
       R : constant Cache_Ref.Ref := C.Get;
       N : constant Cache_Ref.Ref := Cache_Ref.Create;
    begin
       N.Value.all.Map := R.Value.Map;
       N.Value.all.Insert (Key, Value);
       C.Set (N); -- install the new map atomically
    end Put;

Pros and Cons

+: high performance in SMP environments
+: no thread contention in Java
-: cache update can loose some entries
-: still some thread contention in Ada but limited to copying a reference (C.Set)

2 comments
To add a comment, you must be connected. Login to add a comment

Showing multiprocessor issue when updating a shared counter

By Stephane Carrez

When working on several Ada concurrent counter implementations, I was interested to point out the concurrent issue that exists in multi-processor environment. This article explains why you really have to take this issue seriously in multi-tasks applications, specially because multi-core processors are now quite common.

What's the issue

Let's say we have a simple integer shared by several tasks:

Counter : Integer;

And several tasks will use the following statement to increment the counter:

  Counter := Counter + 1;

We will see that this implementation is wrong (even if a single instruction is used).

Multi task increment sample

To show up the issue, let's define two counters. One not protected and another protected from concurrent accesses by using a specific data structure provided by the Ada Util library.


with Util.Concurrent.Counters;
..
  Unsafe  : Integer := 0;
  Counter : Util.Concurrent.Counters.Counter;

In our testing procedure, let's declare a task type that will increment both versions of our counters. Several tasks will run concurrently so that the shared counter variables will experience a lot of concurrent accesses. The task type is declared in a declare block inside our procedure so that we will benefit from task synchronisation at the end of the block (See RM 7.6, and RM 9.3).

Each task will increment both counters in a loop. We should expect the two counters to get the same value at the end. We will see this is not the case in multi-processor environments.

declare
  task type Worker is
    entry Start (Count : in Natural);
  end Worker;

  task body Worker is
    Cnt : Natural;
  begin
      accept Start (Count : in Natural) do
        Cnt := Count;
      end;
      for I in 1 .. Cnt loop
        Util.Concurrent.Counters.Increment (Counter);
        Unsafe := Unsafe + 1;
      end loop;
  end Worker;

Now, in the same declaration block, we will define an array of tasks to show up the concurrency.

   type Worker_Array is array (1 .. Task_Count) of Worker;
   Tasks : Worker_Array;

Our tasks are activated and they are waiting to get the counter. Let's make our tasks count 10 million times.

begin
  for I in Tasks'Range loop
    Tasks (I).Start (10_000_000);
  end loop;
end;

Before leaving the declare scope, Ada will wait until the tasks have finished. (yes, there is no need to write any pthread_join code). After this block, we can just print out the value stored in the two counters and compare them:

Log.Info ("Counter value at the end       : " & Integer'Image (Value (Counter)));
Log.Info ("Unprotected counter at the end : " & Integer'Image (Unsafe));

The complete source is available in the Ada Util project in multipro.adb.

The Results

With one task, everything is Ok (Indeed!):

Starting  1 tasks
Expected value at the end      :  10000000
Counter value at the end       :  10000000
Unprotected counter at the end :  10000000

With two tasks, the problem appears:

Starting  2 tasks
Expected value at the end      :  10000000
Counter value at the end       :  10000000
Unprotected counter at the end :  8033821

And it aggravates as the number of tasks increases.

Starting  16 tasks
Expected value at the end      :  10000000
Counter value at the end       :  10000000
Unprotected counter at the end :  2496811

(The above results have been produced on an Intel Core Quad; Similar problems show up on Atom processors as well)

Explanation

On x86 processors, the compiler can use an incl instruction for the unsafe counter increment. So, one instruction for our increment. You thought it was thread safe. Big mistake!

  incl %(eax)

This instruction is atomic in a mono-processor environment meaning that it cannot be interrupted. However, in a multi-processor environment, each processor has its own memory cache (L1 cache) and will read and increment the value into its own cache. Caches are synchronized but this is almost always too late. Indeed, two processors can read their L1 cache, increment the value and save it at the same time (thus, loosing one increment). This is what is happening with the unprotected counter.

Let's see how to do the protection.

Protection with specific assembly instruction

To avoid this, it is necessary to use special instructions that will force the memory location to be synchronized and locked until the instruction completes. On x86, this is achieved by the lock instruction prefix. The following is guaranteed to be atomic on multi-processors:

  lock
  incl %(eax)

The lock instruction prefix introduces a delay to the execution of the instruction it protects. This delay increases slightly when concurrency occurs but it remains acceptable (up to 10 times slower).

For Sparc, Mips and other processors, the implementation requires to loop until either a lock is get (Spinlock) or it is guaranteed that no other processor has modified the counter at the same time.

Source: Util.Concurrent.Counters.ads, Util.Concurrent.Counters.adb

Protection with an Ada protected type

A safe and portable counter implementation can be made by using Ada protected types. The protected type allows to define a protected procedure Increment which provides an exclusive read-write access to the data (RM 9.5.1). The protected function Value will offer a concurrent read-only access to the data.

package Util.Concurrent.Counters is
    type Counter is limited private;
    procedure Increment (C : in out Counter);
    function Value (C : in Counter) return Integer;
private
  protected type Cnt is
      procedure Increment;
      function Get return Integer;
   private
      N : Integer := 0;
   end Cnt;
   type Counter is limited record
      Value : Cnt;
   end record;
end Util.Concurrent.Counters;

Source: Util.Concurrent.Counters.ads, Util.Concurrent.Counters.adb

To add a comment, you must be connected. Login to add a comment