Producer/Consumer con buffer

by Valeriano Tortola 3. marzo 2008 00:47

El modelo producer/consumer es un ejemplo de relación mediante semáforos ó monitores entre dos hilos que intercambian elementos de forma segura y controlada, de forma que cuando el "productor" introduce un elemento avisa a al "consumidor" de que ya puede leer. De esta forma, ambas partes están en idle mientras no hay nada que hacer y se ponen en marcha automáticamente cuando "hay faena".

Podemos ver un ejemplo en C# mediante el uso de la clase Monitor en la web de Jon Skeet.

Este ejemplo, es una variación que utiliza un buffer de elementos y añade la posibilidad de indicar que un elemento es el último, de forma que se anule la condición de espera para el/los hilos de lectura cuando no haya elementos, cuando le reader obtenga un null... es que no hay ni habrá más elementos en el buffer:

public class ProducerConsumerBuffered<T> 
    where T: class
{
    private Queue<T> buffer = new Queue<T>();
    private readonly Int32 max;
    private Boolean last = false;
 
    public ProducerConsumerBuffered(Int32 Max)
    {
        this.max = Max;
    }
 
    /// <summary>
    /// Introduce un elemento.
    /// </summary>
    /// <param name="item">Elemento</param>
    /// <param name="last">Indica si es el último elemento</param>
    public void Put(T item, Boolean last)
    {
        lock (buffer)
        {
            if(!this.last)
                this.last = last;
 
            buffer.Enqueue(item);
 
            Monitor.Pulse(buffer);
 
            // Si se ha alcanzado el máximo
            // bloqueo hasta que alguien lea
            while (buffer.Count == max)
                Monitor.Wait(buffer);
        }
    }
 
    /// <summary>
    /// Obtiene un elemento.
    /// </summary>
    /// <returns>Elemento</returns>
    public T Pop()
    {
        T r = null;
        lock (buffer)
        {
            // Si no hay elementos y no ha
            // aparecido el elemento final espero
            while(buffer.Count == 0 && !last)
                Monitor.Wait(buffer);
 
            if (buffer.Count>0) 
                r = buffer.Dequeue();
 
            Monitor.Pulse(buffer);
        }
        return r;
    }
}

Para probarlo, podemos usar un simple programa de prueba :

static ProducerConsumerBuffered<Byte[]> bf = 
    new ProducerConsumerBuffered<Byte[]>(20);
static void Main(string[] args)
{
    new Thread(new ThreadStart(write)).Start();
    new Thread(new ThreadStart(read)).Start();
 
    Console.WriteLine("ok ");
    Console.ReadKey(true);
}
 
static void write()
{
    Random r = new Random(5);
    for(Int32 i = 0; i<100;i++)
    {
        bf.Put(Encoding.ASCII.GetBytes(i.ToString()),i==99);
    }
    Console.WriteLine("Writer Exited");
}
 
static void read()
{
    Random r = new Random(5);
    Byte[] value = new Byte[0];
    while (value!=null)
    {
        value = bf.Pop();
        if(value!=null) 
            Console.WriteLine(Encoding.ASCII.GetString(value));
        Thread.Sleep(r.Next(1, 50));
    }
    Console.WriteLine("Reader Exited");
}

 Vemos que el writer termina y el reader aun sigue con los 20 elementos del buffer.

Se aceptan sugerencias :D

Tags: , , ,

.NET 2.0 | C# 2.0

Comentarios

04/03/2008 3:44:00 #

trackback

Trackback from Pensando en asíncrono

Producer/Consumer con buffer

Pensando en asíncrono |

Comentarios no permitidos