Recorrer una estructura en arbol con múltiples hilos: ThreadedTreeBase

by Valeriano Tortola 13. octubre 2007 09:52

Esta clase, fruto de un fin de semana de inspiración y una semana de depurado en mis ratos libres, que no son muchos, sigue el planteamiento anterior para recorrer estructuras en arbol con múltiples hilos. Se trata de una clase abstracta que encapsula todo el mecanismo multithreading, simplemente hay que implementar un método que define como se obtienen sus hijos y suscribirse a un evento que es disparado por cada nodo.

Siempre es buena idea encapsular funcionalidades complejas de este tipo ya que:

  • Permite probar la funcionalidad en otros campos.
  • Permite hacer correciones de forma más sencilla. La lógica puede ser bastante complicada y no conviene mezclarla con la lógica de negocios.
  • Permite su reusabilidad.

La clase utiliza un tipo genérico para que pueda adaptarse a cualquier tipo. Los nodos encontrados se envian vía evento, junto a un StringBuilder que indica su jerarquia en el arbol y cuyo separador puede ser indicado también en una sobrecarga del constructor. Además, la clase argumento del evento está implementado dentro de la clase, lo que permite que use el tipo genérico de la clase ThreadedTreeBase. Las excepciones capturadas se devuelven también en un evento, por lo que conviene comprobar si la propiedad Error es null ó no cuando evaluemos un nodo.

Hay un método que permite cancelar la operación si esta en curso por medio de la evaluación de una variable run que se comprueba en cada ciclo.

Cuando se construye, se le indica el número de hilos que podrá manejar y la prioridad que se le quiere asignar.

public abstract class ThreadedTreeBase<NodeType>
  where NodeType : class
{
  // Objeto que controla el fin de la ejecución
  private readonly Object endLock = new Object();
 
  // Delegado para recursividad
  private delegate void runDlgt_(NodeType Node, StringBuilder Path);
  private readonly runDlgt_ runDlgt;
 
  // Control de hilos.
  private readonly Int32 maxThreads;
  private Int32 runningThreads = 0;
  public Int32 MaxThreads { get { return maxThreads; } }
 
  // Indicador de nodos por procesar
  private Int32 remainingNodes = 0;
 
  // Control de prioridad de ejecución.
  private readonly ThreadPriority priority;
  public ThreadPriority Priority { get { return priority; } }
 
  // Control de cancelación.
  private Int32 runFlag = 1;
 
  // Caracter delimitador de la ruta xpath
  private readonly Char xpathDelimiter = '/';
  public Char XpathDelimiter { get { return xpathDelimiter; } }
 
  // Evento de nodo procesado
  protected delegate void ProcessedNode_(ProcessedNodeEventArgs e);
  protected event ProcessedNode_ ProcessedNode;
  protected class ProcessedNodeEventArgs : EventArgs
  {
    private NodeType node;
    private String xpath;
    private Exception error;
 
    public NodeType Node { get { return node; } }
    public String XPath { get { return xpath; } }
    public Exception Error { get { return error; } }
 
    public ProcessedNodeEventArgs(NodeType Nodo, 
                                  String XPath, 
                                  Exception Error)
    {
      this.node = Nodo;
      this.xpath = XPath;
      this.error = Error;
    }
  }
 
  /// <summary>
  /// Constructor protegido.
  /// </summary>
  /// <param name="MaxThreads">Máximo de tareas concurrentes.</param>
  /// <param name="Priority">Indica la prioridad de los hilos.</param>
  protected ThreadedTreeBase(Int32 MaxThreads, ThreadPriority Priority)
  {
    runDlgt = new runDlgt_(Run);
    
    // Resto un hilo ya que hay que 
    // contar con el thread que inicia 
    // la clase
    this.maxThreads = MaxThreads-1;
    this.priority = Priority;
  }
 
  protected ThreadedTreeBase(Int32 MaxThreads, ThreadPriority Priority, 
                             Char XpathDelimiter): this(MaxThreads, Priority)
  {
    this.xpathDelimiter = XpathDelimiter;
  }
 
  /// <summary>
  /// Iniciar navegación del arbol.
  /// </summary>
  /// <param name="Parte">Parte inicial.</param>
  public void RunBrowser(NodeType nodo)
  {
    lock (endLock)
    {
      runDlgt.BeginInvoke(nodo, new StringBuilder(),
        delegate(IAsyncResult ia)
        {
          try { runDlgt.EndInvoke(ia); }
          catch (Exception Ex)
          {
            Debug.Fail(Ex.Message, Ex.StackTrace);
            sendNodeData(nodo ?? default(NodeType), null, Ex);
          }
        }, null);
      Monitor.Wait(endLock);
    }
  }
 
  /// <summary>
  /// Inicia recorrido desde nodo inicial.
  /// </summary>
  /// <param name="nodo">Nodo inicial</param>
  private void Run(NodeType Node, StringBuilder path)
  {
    // Compruebo la cancelación (nuevas bifurcaciones).
    if (Thread.VolatileRead(ref runFlag) == 0) return;
 
    // Asigno la prioridad al thread.
    Thread.CurrentThread.Priority = this.priority;
 
    // Obtengo la jerarquia
    path = new StringBuilder(path.ToString());
    path.AppendFormat("{0}{1}",this.xpathDelimiter, Node);
    
    // Envio el nodo vía evento
    sendNodeData(Node, path, null);
 
    // Obtengo los subnodos y los contabilizo
    NodeType[] childs = getNodeChilds(Node);
    lock (endLock) remainingNodes += childs.Length;
    
    // Recorro la lista de nodos hijos...
    foreach (NodeType child in childs)
    {
      // Compruebo la cancelación dentro del bucle.
      // (bifurcaciones en curso).
      if (Thread.VolatileRead(ref runFlag) == 0) return;
 
      // ... llamando recursivamente a cada uno de ellos..
      if (Thread.VolatileRead(ref runningThreads) < maxThreads)
      {
        // Incremento el número de hilos secundarios en ejecución.
        Interlocked.Increment(ref runningThreads);
        // Ejecuto en otro hilo.
        runDlgt.BeginInvoke(child, path,
          delegate(IAsyncResult ia)
            {
              try
              {
                Monitor.Enter(endLock);
                runDlgt.EndInvoke(ia);
              }
              catch (Exception Ex)
              {
                Debug.Fail(Ex.Message, Ex.StackTrace);
                NodeType userState = ia.AsyncState as NodeType;
                sendNodeData(userState ?? default(NodeType), null, Ex);
              }
              finally
              {
                runningThreads--;
                remainingNodes--;
                // Se pulsa sobre 'endLock' si no hay hilos 
                // secundarios en ejecución ni nodos pendientes.
                if ((runningThreads == 0) && (remainingNodes == 0))
                {
                  Monitor.Pulse(endLock);
                }
                Monitor.Exit(endLock);
              }
            }, child);
      }
      else
      {
        try
        {
          runDlgt.Invoke(child, path);
          lock (endLock)
          {
            remainingNodes--;
            if ((runningThreads == 0) && (remainingNodes == 0))
            {
              Monitor.Pulse(endLock);
            }
          }
        }
        catch (Exception Ex)
        {
          Debug.Fail(Ex.Message, Ex.StackTrace);
          sendNodeData(child ?? default(NodeType), null, Ex);
        }
      }
    }
  }
 
  /// <summary>
  /// Envia los datos vía evento.
  /// </summary>
  /// <param name="nodo">Nodo del que se obtuvo la información.</param>
  /// <param name="infoNodo">Información.</param>
  private void sendNodeData(NodeType nodo, 
                            StringBuilder path, Exception Ex)
  {
    if (ProcessedNode != null)
         ProcessedNode.Invoke(new ProcessedNodeEventArgs
           (nodo, path == null ? String.Empty : path.ToString(), Ex));
  }
 
  /// <summary>
  /// Cancela la ejecución.
  /// </summary>
  protected void CancelBrowse()
  {
    // Full memory barrier
    Interlocked.Exchange(ref runFlag, 0);
  }
 
  /// <summary>
  /// Función a implementar con la forma de obtener 
  /// los nodos hijos de otro nodo.
  /// </summary>
  /// <param name="nodo">Nodo padre</param>
  /// <returns>Lista de nodos</returns>
  protected abstract NodeType[] getNodeChilds(NodeType nodo);
}

Ahora un ejemplo de su implementación. El sistema de archivos es a fin de cuentas, una estructura en arbol, así que voy a hacer una pequeña implementación de esta clase que me diga cuantos archivos contiene un directorio contando los que hay en sus subdirectorios... en multihilo, por supuesto :D

Evidentemente, no tiene utilidad práctica lanzar múltiples hilos contra un recurso local, con latencia mínima y compartido para todos los hilos ... simplemente como prueba:

public class ThreadedTreeFileSystem 
  : ThreadedTreeBase<FileSystemInfo>
{
  // Contador 
  private long counter = 0;
  public Int64 Contador
  {
    get { return Thread.VolatileRead(ref counter); }
  }
 
  public ThreadedTreeFileSystem(int MaxThreads, ThreadPriority p)
      : base(MaxThreads, p)
  {
    // Me subscribo al evento que devuelve los nodos.
    this.ProcessedNode += new ThreadedTreeBase<FileSystemInfo>.ProcessedNode_
      (ExplosionSistemaDeArchivos_ProcessedNode);
  }
 
  void ExplosionSistemaDeArchivos_ProcessedNode
    (ThreadedTreeBase<FileSystemInfo>.ProcessedNodeEventArgs e)
  {
    // Si el nodo es un archivo, lo cuento y lo muestro.
    if (e.Node is FileInfo)
      Interlocked.Increment(ref counter);
  }
 
  protected override FileSystemInfo[] getNodeChilds(FileSystemInfo nodo)
  {
    // Obtengo la lista de subnodos.
    List<FileSystemInfo> array = new List<FileSystemInfo>();
    try
    {
      if (nodo is DirectoryInfo)
      {
        array.AddRange(((DirectoryInfo)nodo).GetDirectories("*"));
        array.AddRange(((DirectoryInfo)nodo).GetFiles());
      }
    }
    catch (SecurityException sEx)
    {
      Console.WriteLine("{0} : {1}", sEx.GetType().Name, sEx.Message);
    }
    return array.ToArray();
  }
}

La implementación tiene cuatro partes esenciales:

  1. La clase se define heredando de ThreadedTreeBase y especificando el tipo que usaremos como nodo.
  2. En el constructor, se pasan los parámetros adecuados y se define el evento.
  3. En el manejador del evento, se procesa el nodo, en este caso, únicamente se cuentan los archivos y se muestra su nombre. También podríamos ver la jerarquia en e.XPath.
  4. En GetNodeChilds, el método que debemos implementar para completar la clase abstracta, esta la lógica necesaria para obtener los subnodos de un nodo y lo devuelve en forma de array.

Es importante remarcar, que aunque todo el mecanismo multithreading esta encapsulado, seguimos necesitando sincronizar los accesos a la memoria compartida, como por ejemplo el acceso a counter, que se incrementa con Interlocked para evitar race conditions (condiciones de anticipación).

Para su uso...

string folder = Environment.GetFolderPath(Environment.SpecialFolder.Personal);
  
ThreadedTreeFileSystem browser =
    new ThreadedTreeFileSystem(5, ThreadPriority.Normal);
 
browser.RunBrowser(new DirectoryInfo(folder));
 
Console.WriteLine("-- Archivos: " + browser.Contador.ToString());

Cualquier feedback sobre esta clase ó su diseño es bienvenida :)

Tags: , , , ,

.NET 2.0 | C# 2.0

Comentarios

13/10/2007 11:03:39 #

trackback

Trackback from vtortola

Recorrer una estructura en arbol con multiples hilos: ThreadedTreeBase

vtortola |

15/10/2007 3:43:21 #

trackback

Trackback from vtortola

Recorrer una estructura en arbol con multiples hilos: ThreadedTreeBase

vtortola |

11/12/2007 19:00:26 #

espinete

Le felicito señor, interesantes post. Siga así con estos temas de asíncronismo, multi-threading.

Saludos.

espinete |

06/07/2008 0:54:08 #

trackback

Trackback from Pensando en asíncrono

Recursividad y yield return. Haciendo queries a colecciones en arbol en C# 2.0

Pensando en asíncrono |

Comentarios no permitidos