El modelo producer/consumer es un ejemplo de relación mediante semáforos ó monitores entre dos hilos que intercambian elementos de forma segura y controlada, de forma que cuando el "productor" introduce un elemento avisa a al "consumidor" de que ya puede leer. De esta forma, ambas partes están en idle mientras no hay nada que hacer y se ponen en marcha automáticamente cuando "hay faena".
Podemos ver un ejemplo en C# mediante el uso de la clase Monitor en la web de Jon Skeet.
Este ejemplo, es una variación que utiliza un buffer de elementos y añade la posibilidad de indicar que un elemento es el último, de forma que se anule la condición de espera para el/los hilos de lectura cuando no haya elementos, cuando le reader obtenga un null... es que no hay ni habrá más elementos en el buffer:
public class ProducerConsumerBuffered<T>
where T: class
{
private Queue<T> buffer = new Queue<T>();
private readonly Int32 max;
private Boolean last = false;
public ProducerConsumerBuffered(Int32 Max)
{
this.max = Max;
}
/// <summary>
/// Introduce un elemento.
/// </summary>
/// <param name="item">Elemento</param>
/// <param name="last">Indica si es el último elemento</param>
public void Put(T item, Boolean last)
{
lock (buffer)
{
if(!this.last)
this.last = last;
buffer.Enqueue(item);
Monitor.Pulse(buffer);
// Si se ha alcanzado el máximo
// bloqueo hasta que alguien lea
while (buffer.Count == max)
Monitor.Wait(buffer);
}
}
/// <summary>
/// Obtiene un elemento.
/// </summary>
/// <returns>Elemento</returns>
public T Pop()
{
T r = null;
lock (buffer)
{
// Si no hay elementos y no ha
// aparecido el elemento final espero
while(buffer.Count == 0 && !last)
Monitor.Wait(buffer);
if (buffer.Count>0)
r = buffer.Dequeue();
Monitor.Pulse(buffer);
}
return r;
}
}
Para probarlo, podemos usar un simple programa de prueba :
static ProducerConsumerBuffered<Byte[]> bf =
new ProducerConsumerBuffered<Byte[]>(20);
static void Main(string[] args)
{
new Thread(new ThreadStart(write)).Start();
new Thread(new ThreadStart(read)).Start();
Console.WriteLine("ok ");
Console.ReadKey(true);
}
static void write()
{
Random r = new Random(5);
for(Int32 i = 0; i<100;i++)
{
bf.Put(Encoding.ASCII.GetBytes(i.ToString()),i==99);
}
Console.WriteLine("Writer Exited");
}
static void read()
{
Random r = new Random(5);
Byte[] value = new Byte[0];
while (value!=null)
{
value = bf.Pop();
if(value!=null)
Console.WriteLine(Encoding.ASCII.GetString(value));
Thread.Sleep(r.Next(1, 50));
}
Console.WriteLine("Reader Exited");
}
Vemos que el writer termina y el reader aun sigue con los 20 elementos del buffer.
Se aceptan sugerencias :D