En programación concurrente , un monitor es una construcción de sincronización que evita que los subprocesos accedan simultáneamente al estado de un objeto compartido y les permite esperar a que el estado cambie. Proporcionan un mecanismo para que los subprocesos renuncien temporalmente al acceso exclusivo para esperar a que se cumpla alguna condición, antes de recuperar el acceso exclusivo y reanudar su tarea. Un monitor consta de un mutex (bloqueo) y al menos una variable de condición . Una variable de condición se "señala" explícitamente cuando se modifica el estado del objeto, pasando temporalmente el mutex a otro subproceso que "espera" la variable condicional.
Otra definición de monitor es una clase , objeto o módulo seguro para subprocesos que envuelve un mutex para permitir de forma segura el acceso a un método o variable por parte de más de un subproceso . La característica definitoria de un monitor es que sus métodos se ejecutan con exclusión mutua : en cada punto del tiempo, como máximo un subproceso puede estar ejecutando cualquiera de sus métodos . Al usar una o más variables de condición, también puede proporcionar la capacidad para que los subprocesos esperen una determinada condición (utilizando así la definición anterior de "monitor"). En el resto de este artículo, este sentido de "monitor" se denominará "objeto/clase/módulo seguro para subprocesos".
Los monitores fueron inventados por Per Brinch Hansen [1] y CAR Hoare [ 2] y se implementaron por primera vez en el lenguaje Concurrent Pascal de Brinch Hansen . [3]
Mientras un hilo está ejecutando un método de un objeto seguro para hilos, se dice que ocupa el objeto, al mantener su mutex (bloqueo) . Los objetos seguros para hilos se implementan para hacer cumplir que, en cada punto en el tiempo, como máximo un hilo puede ocupar el objeto . El bloqueo, que inicialmente está desbloqueado, se bloquea al comienzo de cada método público y se desbloquea en cada retorno de cada método público.
Al llamar a uno de los métodos, un subproceso debe esperar hasta que ningún otro subproceso esté ejecutando ninguno de los métodos del objeto seguro para subprocesos antes de iniciar la ejecución de su método. Tenga en cuenta que sin esta exclusión mutua, dos subprocesos podrían provocar que se pierda o gane dinero sin motivo alguno. Por ejemplo, dos subprocesos que retiran 1000 de la cuenta podrían devolver verdadero, mientras que hacen que el saldo disminuya solo en 1000, de la siguiente manera: primero, ambos subprocesos obtienen el saldo actual, lo encuentran mayor que 1000 y le restan 1000; luego, ambos subprocesos almacenan el saldo y regresan.
Para muchas aplicaciones, la exclusión mutua no es suficiente. Los subprocesos que intentan realizar una operación pueden tener que esperar hasta que se cumpla alguna condición P. Un bucle de espera activo
mientras no ( P ) saltes
no funcionará, ya que la exclusión mutua evitará que cualquier otro subproceso ingrese al monitor para que la condición sea verdadera. Existen otras "soluciones", como tener un bucle que desbloquea el monitor, espera una cierta cantidad de tiempo, bloquea el monitor y verifica la condición P . Teóricamente, funciona y no se bloqueará, pero surgen problemas. Es difícil decidir una cantidad apropiada de tiempo de espera: demasiado pequeña y el subproceso acaparará la CPU, demasiado grande y aparentemente no responderá. Lo que se necesita es una forma de señalar al subproceso cuando la condición P es verdadera (o podría ser verdadera).
Un problema clásico de concurrencia es el del productor/consumidor acotado , en el que hay una cola o un búfer de anillo de tareas con un tamaño máximo, con uno o más subprocesos que son subprocesos "productores" que agregan tareas a la cola, y uno o más subprocesos adicionales que son subprocesos "consumidores" que sacan tareas de la cola. Se supone que la cola en sí no es segura para subprocesos y puede estar vacía, llena o entre vacía y llena. Siempre que la cola esté llena de tareas, necesitamos que los subprocesos productores se bloqueen hasta que haya espacio para que los subprocesos consumidores saquen tareas de la cola. Por otro lado, siempre que la cola esté vacía, necesitamos que los subprocesos consumidores se bloqueen hasta que haya más tareas disponibles debido a que los subprocesos productores las agreguen.
Como la cola es un objeto concurrente compartido entre subprocesos, los accesos a ella deben ser atómicos , porque la cola puede ponerse en un estado inconsistente durante el curso del acceso a la cola que nunca debe exponerse entre subprocesos. Por lo tanto, cualquier código que acceda a la cola constituye una sección crítica que debe sincronizarse por exclusión mutua. Si las instrucciones de código y procesador en secciones críticas de código que acceden a la cola pueden intercalarse mediante cambios de contexto arbitrarios entre subprocesos en el mismo procesador o mediante subprocesos que se ejecutan simultáneamente en múltiples procesadores, entonces existe el riesgo de exponer un estado inconsistente y causar condiciones de carrera .
Un enfoque ingenuo es diseñar el código con espera activa y sin sincronización, lo que hace que el código esté sujeto a condiciones de carrera:
cola RingBuffer global ; // Un búfer de anillo de tareas no seguro para subprocesos. // Método que representa el comportamiento de cada hilo productor: public method producer () { while ( true ) { task myTask = ...; // El productor crea una nueva tarea para agregar. while ( queue . isFull ()) {} // Espera activa hasta que la cola no esté llena. queue . enqueue ( myTask ); // Agrega la tarea a la cola. } } // Método que representa el comportamiento de cada hilo consumidor: public method consumer () { while ( true ) { while ( queue . isEmpty ()) {} // Espera activa hasta que la cola no esté vacía. myTask = queue . dequeue (); // Saca una tarea de la cola. doStuff ( myTask ); // Ve y haz algo con la tarea. } }
Este código tiene un problema grave, ya que los accesos a la cola se pueden interrumpir e intercalar con los accesos de otros subprocesos a la cola. Los métodos queue.enqueue y queue.dequeue probablemente tengan instrucciones para actualizar las variables miembro de la cola, como su tamaño, posiciones de inicio y fin, asignación y distribución de elementos de la cola, etc. Además, los métodos queue.isEmpty() y queue.isFull() también leen este estado compartido. Si se permite que los subprocesos de productor/consumidor se intercalen durante las llamadas a enqueue/dequeue, se puede exponer un estado inconsistente de la cola, lo que conduce a condiciones de carrera. Además, si un consumidor vacía la cola entre la salida de otro consumidor de la espera ocupada y la llamada a "dequeue", entonces el segundo consumidor intentará sacar de la cola una cola vacía, lo que conduce a un error. De la misma manera, si un productor llena la cola entre el momento en que otro productor sale de la espera ocupada y llama a "enqueue", entonces el segundo productor intentará agregar a una cola llena, lo que generará un error.
Un enfoque ingenuo para lograr la sincronización, como se mencionó anteriormente, es usar " spin-waiting ", en el que se utiliza un mutex para proteger las secciones críticas del código y se sigue usando busy-wait, adquiriendo y liberando el bloqueo entre cada verificación busy-wait.
global RingBuffer queue ; // Un búfer de anillo de tareas no seguro para subprocesos. global Lock queueLock ; // Un mutex para el búfer de anillo de tareas. // Método que representa el comportamiento de cada hilo productor: public method producer () { while ( true ) { task myTask = ...; // El productor crea una nueva tarea para agregar. queueLock . acquire (); // Adquirir bloqueo para verificación inicial de espera activa. while ( queue . isFull ()) { // Espera activa hasta que la cola no esté llena. queueLock . release (); // Eliminar el bloqueo temporalmente para permitir una oportunidad para otros subprocesos // que necesiten queueLock para ejecutarse para que un consumidor pueda tomar una tarea. queueLock . acquire (); // Volver a adquirir el bloqueo para la siguiente llamada a "queue.isFull()". } queue . enqueue ( myTask ); // Agrega la tarea a la cola. queueLock . release (); // Elimina el bloqueo de la cola hasta que lo necesitemos nuevamente para agregar la siguiente tarea. } } // Método que representa el comportamiento de cada hilo consumidor: public method consumer () { while ( true ) { queueLock . acquire (); // Adquirir bloqueo para comprobación inicial de espera activa. while ( queue . isEmpty ()) { // Espera activa hasta que la cola no esté vacía. queueLock . release (); // Eliminar el bloqueo temporalmente para dar una oportunidad a otros hilos // que necesiten queueLock para ejecutarse para que un productor pueda añadir una tarea. queueLock . acquire (); // Volver a adquirir el bloqueo para la siguiente llamada a "queue.isEmpty()". } myTask = queue . dequeue (); // Quitar una tarea de la cola. queueLock . release (); // Eliminar el bloqueo de la cola hasta que lo necesitemos de nuevo para quitar la siguiente tarea. doStuff ( myTask ); // Salir y hacer algo con la tarea. } }
Este método garantiza que no se produzca un estado inconsistente, pero desperdicia recursos de la CPU debido a la espera ocupada innecesaria. Incluso si la cola está vacía y los subprocesos del productor no tienen nada que agregar durante mucho tiempo, los subprocesos del consumidor siempre están esperando ocupados innecesariamente. Del mismo modo, incluso si los consumidores están bloqueados durante mucho tiempo en el procesamiento de sus tareas actuales y la cola está llena, los productores siempre están esperando ocupados. Este es un mecanismo derrochador. Lo que se necesita es una forma de hacer que los subprocesos del productor se bloqueen hasta que la cola no esté llena, y una forma de hacer que los subprocesos del consumidor se bloqueen hasta que la cola no esté vacía.
(NB: Los mutex en sí mismos también pueden ser bloqueos giratorios que involucran una espera activa para obtener el bloqueo, pero para resolver este problema de recursos de CPU desperdiciados, asumimos que queueLock no es un bloqueo giratorio y utiliza adecuadamente una cola de bloqueo de bloqueo).
La solución es utilizar variables de condición . Conceptualmente, una variable de condición es una cola de subprocesos, asociada con un mutex, en la que un subproceso puede esperar a que se cumpla alguna condición. Por lo tanto, cada variable de condición c está asociada con una aserción P c . Mientras un subproceso espera una variable de condición, no se considera que ese subproceso ocupe el monitor, por lo que otros subprocesos pueden ingresar al monitor para cambiar el estado del mismo. En la mayoría de los tipos de monitores, estos otros subprocesos pueden enviar señales a la variable de condición c para indicar que la aserción P c es verdadera en el estado actual.
Por lo tanto, hay tres operaciones principales sobre las variables de condición:
wait c, m
, donde c
es una variable de condición y m
es un mutex (bloqueo) asociado con el monitor. Esta operación es llamada por un hilo que necesita esperar hasta que la afirmación P c sea verdadera antes de continuar. Mientras el hilo está esperando, no ocupa el monitor. La función, y el contrato fundamental, de la operación "esperar" es realizar los siguientes pasos:m
,c
cola de espera" (también conocida como "cola de suspensión") a la "cola de ejecución" de hilos, ym
.c
la cola de espera de , el siguiente contador de programa que se ejecutará está en el paso 2, en medio de la función/ subrutina de "espera" . Por lo tanto, el hilo se inactiva y luego se despierta en medio de la operación de "espera".c
la cola de suspensión de y haber liberado el mutex, pero se produjo un cambio de subproceso preventivo antes de que el subproceso se durmiera, y otro subproceso llamado operación de señal (ver más abajo) al c
mover el primer subproceso de regreso fuera de c
la cola de . Tan pronto como el primer subproceso en cuestión vuelva a cambiar a, su contador de programa estará en el paso 1c, y se dormirá y no podrá ser despertado nuevamente, violando la invariante de que debería haber estado en c
la cola de suspensión de cuando se durmió. Otras condiciones de carrera dependen del orden de los pasos 1a y 1b, y dependen de dónde ocurre un cambio de contexto.signal c
, también conocido como notify c
, es llamado por un hilo para indicar que la afirmación P c es verdadera. Dependiendo del tipo y la implementación del monitor, esto mueve uno o más hilos de c
la cola de suspensión de a la "cola de listos", u otra cola para que se ejecute. Por lo general, se considera una buena práctica realizar la operación de "señal" antes de liberar el mutex m
asociado con c
, pero siempre que el código esté diseñado correctamente para la concurrencia y dependiendo de la implementación de subprocesos, a menudo también es aceptable liberar el bloqueo antes de la señalización. Dependiendo de la implementación de subprocesos, el orden de esto puede tener ramificaciones de prioridad de programación. (Algunos autores [¿ quiénes? ] en cambio abogan por una preferencia por liberar el bloqueo antes de la señalización). Una implementación de subprocesos debe documentar cualquier restricción especial en este orden.broadcast c
, también conocida como notifyAll c
, es una operación similar que despierta todos los subprocesos en la cola de espera de c. Esto vacía la cola de espera. Generalmente, cuando más de una condición de predicado está asociada con la misma variable de condición, la aplicación requerirá broadcast en lugar de signal porque un subproceso que espera la condición incorrecta podría ser despertado y luego volver a dormir inmediatamente sin despertar un subproceso que espera la condición correcta que acaba de volverse verdadera. De lo contrario, si la condición de predicado es uno a uno con la variable de condición asociada a ella, entonces signal puede ser más eficiente que broadcast .Como regla de diseño, se pueden asociar múltiples variables de condición con el mismo mutex, pero no al revés. (Esta es una correspondencia de uno a muchos ). Esto se debe a que el predicado P c es el mismo para todos los subprocesos que usan el monitor y debe protegerse con exclusión mutua de todos los demás subprocesos que podrían hacer que se cambie la condición o que podrían leerla mientras el subproceso en cuestión hace que se cambie, pero puede haber diferentes subprocesos que quieran esperar una condición diferente en la misma variable que requiera que se use el mismo mutex. En el ejemplo productor-consumidor descrito anteriormente, la cola debe estar protegida por un objeto mutex único, m
. Los subprocesos "productores" querrán esperar en un monitor usando un bloqueo m
y una variable de condición que se bloquea hasta que la cola no esté llena. Los subprocesos "consumidores" querrán esperar en un monitor diferente usando el mismo mutex pero una variable de condición diferente que se bloquea hasta que la cola no esté vacía. (Por lo general) nunca tendría sentido tener diferentes mutex para la misma variable de condición, pero este ejemplo clásico muestra por qué a menudo tiene sentido tener múltiples variables de condición que utilicen el mismo mutex. Un mutex utilizado por una o más variables de condición (uno o más monitores) también puede compartirse con código que no utiliza variables de condición (y que simplemente lo adquiere/libera sin ninguna operación de espera/señal), si esas secciones críticas no requieren esperar una determinada condición en los datos concurrentes.m
El uso básico adecuado de un monitor es:
acquire ( m ); // Adquirir el bloqueo de este monitor. while ( ! p ) { // Mientras la condición/predicado/afirmación que estamos esperando no sea verdadera... wait ( m , cv ); // Esperar el bloqueo y la variable de condición de este monitor. } // ... La sección crítica del código va aquí... signal ( cv2 ); // O: broadcast(cv2); // cv2 puede ser igual a cv o diferente. release ( m ); // Liberar el bloqueo de este monitor.
Lo que sigue es el mismo pseudocódigo pero con comentarios más detallados para explicar mejor lo que está sucediendo:
// ... (código anterior) // A punto de ingresar al monitor. // Adquirir el mutex asesor (bloqueo) asociado con los datos concurrentes que se comparten entre subprocesos, // para garantizar que no se puedan intercalar de manera preventiva dos subprocesos o ejecutarlos simultáneamente en diferentes núcleos mientras se ejecutan en secciones críticas que leen o escriben estos mismos datos concurrentes. Si otro subproceso tiene este mutex, este subproceso se pondrá en suspensión (se bloqueará) y se colocará en la cola de suspensión de m. (El mutex "m" no debe ser un spin-lock). acquire ( m ); // Ahora, tenemos el bloqueo y podemos verificar la condición por primera vez.// La primera vez que ejecutamos la condición del bucle while después de la "adquisición" anterior, nos preguntamos: "¿La condición/predicado/afirmación que estamos esperando ya es verdadera?"while ( ! p ()) // "p" es cualquier expresión (por ejemplo, variable o // llamada a función) que verifica la condición y // evalúa como booleana. Esta en sí misma es una sección crítica // por lo que *DEBE* mantener el bloqueo cuando // ejecute esta condición del bucle "while"! // Si esta no es la primera vez que se verifica la condición "while", // entonces nos estamos haciendo la pregunta, "Ahora que otro hilo que usa este // monitor me ha notificado y me ha despertado y he cambiado de contexto // de nuevo a, ¿la condición/predicado/afirmación que estamos esperando permaneció // verdadera entre el momento en que me desperté y el momento en que volví a adquirir // el bloqueo dentro de la llamada "wait" en la última iteración de este bucle, o // algún otro hilo hizo que la condición se volviera falsa nuevamente mientras tanto, haciendo de esto un despertar espurio? { // Si esta es la primera iteración del bucle, entonces la respuesta es // "no" -- la condición no está lista todavía. De lo contrario, la respuesta es: // lo último. Esta fue una activación espuria, otro subproceso ocurrió primero // y causó que la condición se volviera falsa nuevamente, y debemos // esperar nuevamente.wait ( m , cv ); // Prevenir temporalmente que cualquier otro hilo en cualquier núcleo haga // operaciones en m o cv. // release(m) // Liberar atómicamente el bloqueo "m" para que otro // // código que use estos datos concurrentes // // pueda operar, mover este hilo a la // // cola de espera de cv para que sea notificado // // en algún momento cuando la condición se vuelva // // verdadera, y suspender este hilo. Volver a habilitar // // otros hilos y núcleos para que hagan // // operaciones en m y cv. // // Se produce un cambio de contexto en este núcleo. // // En algún momento futuro, la condición que estamos esperando se vuelve // verdadera, y otro hilo que usa este monitor (m, cv) hace // una señal que despierta a este hilo, o una // transmisión que nos despierta, lo que significa que hemos sido sacados // de la cola de espera de cv. // // Durante este tiempo, otros hilos pueden hacer que la condición se vuelva // falsa nuevamente, o la condición puede alternar una o más // veces, o puede suceder que permanezca verdadera. // // Este hilo se vuelve a cambiar a algún núcleo. // // acquire(m) // Se vuelve a adquirir el bloqueo "m". // Finaliza esta iteración del bucle y vuelve a verificar la condición del bucle "while" para asegurarte de que el predicado sigue siendo verdadero. } // ¡La condición que estamos esperando es verdadera! // Todavía mantenemos el bloqueo, ya sea desde antes de ingresar al monitor o desde // la última ejecución de "esperar".// Aquí va la sección crítica del código, que tiene una condición previa de que nuestro predicado // debe ser verdadero. // Este código puede hacer que la condición de cv sea falsa, y/o hacer que los predicados de otras variables de condición // sean verdaderos.// Señal de llamada o transmisión, dependiendo de qué predicados de las variables de condición (que comparten el mutex m) se han hecho verdaderos o pueden haberse hecho verdaderos, // y el tipo semántico de monitor que se está utilizando.for ( cv_x in cvs_to_signal ) { signal ( cv_x ); // O: broadcast(cv_x); } // Uno o más subprocesos se han despertado pero se bloquearán tan pronto como intenten adquirir m. // Libera el mutex para que los hilos notificados y otros puedan ingresar a sus secciones críticas. release ( m );
Después de haber presentado el uso de variables de condición, vamos a utilizarlas para revisar y resolver el clásico problema de productor/consumidor acotado. La solución clásica es utilizar dos monitores, que comprenden dos variables de condición que comparten un bloqueo en la cola:
global volátil RingBuffer queue ; // Un búfer de anillo de tareas no seguro para subprocesos. global Lock queueLock ; // Un mutex para el búfer de anillo de tareas. (No un spin-lock.) global CV queueEmptyCV ; // Una variable de condición para subprocesos consumidores que esperan a que la cola // deje de estar vacía. Su bloqueo asociado es "queueLock". global CV queueFullCV ; // Una variable de condición para subprocesos productores que esperan a que la cola // deje de estar llena. Su bloqueo asociado también es "queueLock". // Método que representa el comportamiento de cada hilo productor: public method producer () { while ( true ) { // El productor crea una nueva tarea para agregar. task myTask = ...; // Adquirir "queueLock" para la comprobación del predicado inicial. queueLock . acquire (); // Sección crítica que verifica si la cola no está llena. while ( queue . isFull ()) { // Libera "queueLock", pone este hilo en cola en "queueFullCV" y duerme este hilo. wait ( queueLock , queueFullCV ); // Cuando este hilo se despierta, vuelve a adquirir "queueLock" para la próxima verificación de predicado. } // Sección crítica que agrega la tarea a la cola (tenga en cuenta que estamos manteniendo "queueLock"). queue . enqueue ( myTask ); // Despierta uno o todos los hilos consumidores que están esperando que la cola no esté vacía ahora que está garantizado, para que un hilo consumidor tome la tarea. signal ( queueEmptyCV ); // O: broadcast(queueEmptyCV); // Fin de las secciones críticas. // Liberamos "queueLock" hasta que lo necesitemos nuevamente para agregar la siguiente tarea. queueLock . release (); } } // Método que representa el comportamiento de cada hilo consumidor: public method consumer () { while ( true ) { // Adquirir "queueLock" para la comprobación del predicado inicial. queueLock . acquire (); // Sección crítica que verifica si la cola no está vacía. while ( queue . isEmpty ()) { // Libera "queueLock", pone este hilo en cola en "queueEmptyCV" y duerme este hilo. wait ( queueLock , queueEmptyCV ); // Cuando se despierta este hilo, vuelve a adquirir "queueLock" para la próxima verificación de predicado. } // Sección crítica que saca una tarea de la cola (tenga en cuenta que estamos manteniendo "queueLock"). myTask = queue . dequeue (); // Despierta uno o todos los hilos productores que están esperando que la cola no esté llena ahora que está garantizado, para que un hilo productor agregue una tarea. signal ( queueFullCV ); // O: broadcast(queueFullCV); // Fin de las secciones críticas. // Liberamos "queueLock" hasta que lo necesitemos nuevamente para realizar la siguiente tarea. queueLock . release (); // Sal y haz algo con la tarea. doStuff ( myTask ); } }
Esto garantiza la concurrencia entre los hilos del productor y del consumidor que comparten la cola de tareas, y bloquea los hilos que no tienen nada que hacer en lugar de esperar ocupados como se muestra en el enfoque mencionado anteriormente utilizando bloqueos de giro.
Una variante de esta solución podría utilizar una única variable de condición tanto para productores como para consumidores, tal vez denominada "queueFullOrEmptyCV" o "queueSizeChangedCV". En este caso, más de una condición está asociada con la variable de condición, de modo que la variable de condición representa una condición más débil que las condiciones que están siendo verificadas por subprocesos individuales. La variable de condición representa subprocesos que están esperando que la cola no esté llena y aquellos que esperan que no esté vacía. Sin embargo, hacer esto requeriría usar broadcast en todos los subprocesos que usan la variable de condición y no puede usar una señal regular . Esto se debe a que la señal regular podría despertar un subproceso del tipo incorrecto cuya condición aún no se ha cumplido, y ese subproceso volvería a dormirse sin que se envíe una señal a un subproceso del tipo correcto. Por ejemplo, un productor podría llenar la cola y despertar a otro productor en lugar de a un consumidor, y el productor despertado volvería a dormirse. En el caso complementario, un consumidor podría vaciar la cola y despertar a otro consumidor en lugar de a un productor, y el consumidor volvería a dormirse. El uso de la transmisión garantiza que algún hilo del tipo correcto procederá según lo esperado por el enunciado del problema.
Aquí está la variante que utiliza solo una variable de condición y transmisión:
global volátil RingBuffer queue ; // Un búfer de anillo de tareas no seguro para subprocesos. global Lock queueLock ; // Un mutex para el búfer de anillo de tareas. (No un spin-lock.) global CV queueFullOrEmptyCV ; // Una variable de condición única para cuando la cola no está lista para ningún subproceso // es decir, para subprocesos productores que esperan a que la cola deje de estar llena // y subprocesos consumidores que esperan a que la cola deje de estar vacía. // Su bloqueo asociado es "queueLock". // No es seguro usar "signal" normal porque está asociado con // múltiples condiciones de predicado (aserciones). // Método que representa el comportamiento de cada hilo productor: public method producer () { while ( true ) { // El productor crea una nueva tarea para agregar. task myTask = ...; // Adquirir "queueLock" para la comprobación del predicado inicial. queueLock . acquire (); // Sección crítica que verifica si la cola no está llena. while ( queue . isFull ()) { // Libera "queueLock", pone este hilo en cola en "queueFullOrEmptyCV" y duerme este hilo. wait ( queueLock , queueFullOrEmptyCV ); // Cuando este hilo se despierta, vuelve a adquirir "queueLock" para la próxima verificación de predicado. } // Sección crítica que agrega la tarea a la cola (tenga en cuenta que estamos manteniendo "queueLock"). queue . enqueue ( myTask ); // Despierta todos los hilos de productor y consumidor que están esperando que la cola esté respectivamente // no llena y no vacía ahora que esto último está garantizado, de modo que un hilo de consumidor tome la tarea. broadcast ( queueFullOrEmptyCV ); // No uses "signal" (ya que podría despertar solo a otro hilo de productor). // Fin de las secciones críticas. // Liberamos "queueLock" hasta que lo necesitemos nuevamente para agregar la siguiente tarea. queueLock . release (); } } // Método que representa el comportamiento de cada hilo consumidor: public method consumer () { while ( true ) { // Adquirir "queueLock" para la comprobación del predicado inicial. queueLock . acquire (); // Sección crítica que verifica si la cola no está vacía. while ( queue . isEmpty ()) { // Libera "queueLock", pone este hilo en cola en "queueFullOrEmptyCV" y duerme este hilo. wait ( queueLock , queueFullOrEmptyCV ); // Cuando se despierta este hilo, vuelve a adquirir "queueLock" para la próxima verificación de predicado. } // Sección crítica que saca una tarea de la cola (tenga en cuenta que estamos manteniendo "queueLock"). myTask = queue . dequeue (); // Despierta todos los hilos de productor y consumidor que están esperando que la cola esté respectivamente // no llena y no vacía ahora que lo primero está garantizado, de modo que un hilo de productor agregue una tarea. broadcast ( queueFullOrEmptyCV ); // No uses "signal" (ya que podría despertar solo a otro hilo de consumidor). // Fin de las secciones críticas. // Liberamos "queueLock" hasta que lo necesitemos nuevamente para realizar la siguiente tarea. queueLock . release (); // Sal y haz algo con la tarea. doStuff ( myTask ); } }
Los monitores se implementan utilizando una primitiva atómica de lectura-modificación-escritura y una primitiva de espera. La primitiva de lectura-modificación-escritura (normalmente de prueba y configuración o de comparación e intercambio) suele tener la forma de una instrucción de bloqueo de memoria proporcionada por la ISA , pero también puede estar compuesta por instrucciones sin bloqueo en dispositivos de un solo procesador cuando las interrupciones están deshabilitadas. La primitiva de espera puede ser un bucle de espera activa o una primitiva proporcionada por el SO que evita que el hilo se programe hasta que esté listo para continuar.
A continuación se muestra un ejemplo de implementación de pseudocódigo de partes de un sistema de subprocesos y mutexes y variables de condición de estilo Mesa, utilizando prueba y configuración y una política de "primero en llegar, primero en ser atendido":
// Partes básicas del sistema de subprocesos: // Suponga que "ThreadQueue" admite acceso aleatorio. public volcanic ThreadQueue readyQueue ; // Cola de subprocesos listos no segura para subprocesos. Los elementos son (Thread*). public volcanic global Thread * currentThread ; // Suponga que esta variable es por núcleo. (Las demás se comparten). // Implementa un bloqueo de giro solo en el estado sincronizado del propio sistema de subprocesos. // Esto se utiliza con test-and-set como primitivo de sincronización. public volcanic bool threadingSystemBusy = false ; // Rutina de servicio de interrupción de cambio de contexto (ISR): // En el núcleo de CPU actual, cambia preventivamente a otro hilo. public method contextSwitchISR () { if ( testAndSet ( threadingSystemBusy )) { return ; // No se puede cambiar de contexto en este momento. } // Asegúrese de que esta interrupción no pueda volver a ocurrir, lo que arruinaría el cambio de contexto: systemCall_disableInterrupts (); // Obtener todos los registros del proceso que se está ejecutando actualmente. // Para el contador de programa (PC), necesitaremos la ubicación de la instrucción // de la etiqueta "resume" a continuación. Obtener los valores de los registros depende de la plataforma y puede implicar // leer el marco de pila actual, instrucciones JMP/CALL, etc. (Los detalles están más allá de este alcance). currentThread -> registers = getAllRegisters (); // Almacenar los registros en el objeto "currentThread" en la memoria. currentThread -> registers . PC = resume ; // Establecer el próximo PC en la etiqueta "resume" a continuación en este método. readyQueue . enqueue ( currentThread ); // Vuelve a poner este hilo en la cola de hilos listos para su posterior ejecución. Thread * otherThread = readyQueue . dequeue (); // Elimina y obtiene el siguiente hilo que se ejecutará desde la cola de hilos listos. currentThread = otherThread ; // Reemplaza el valor del puntero global del hilo actual para que esté listo para el siguiente hilo. // Restaurar los registros de currentThread/otherThread, incluido un salto a la PC almacenada del otro hilo // (en "resume" a continuación). Nuevamente, los detalles de cómo se hace esto están más allá de este alcance. restoreRegisters ( otherThread.registers ); // *** ¡Ahora se está ejecutando "otherThread" (que ahora es "currentThread")! El hilo original ahora está "inactivo". *** resume : // Aquí es donde otra llamada contextSwitch() debe establecer la PC al cambiar el contexto nuevamente aquí. // Regresa al punto donde lo dejó otherThread. threadingSystemBusy = false ; // Debe ser una asignación atómica. systemCall_enableInterrupts (); // Vuelva a activar la conmutación preventiva en este núcleo. } // Método de suspensión de subprocesos: // En el núcleo de CPU actual, un cambio de contexto sincrónico a otro subproceso sin colocar // el subproceso actual en la cola de listos. // Debe contener "threadingSystemBusy" y las interrupciones deshabilitadas para que este método // no sea interrumpido por el temporizador de cambio de subproceso que llamaría contextSwitchISR(). // Después de regresar de este método, debe borrar "threadingSystemBusy". public method threadSleep () { // Obtener todos los registros del proceso que se está ejecutando actualmente. // Para el contador de programa (PC), necesitaremos la ubicación de la instrucción // de la etiqueta "resume" a continuación. Obtener los valores de los registros depende de la plataforma y puede implicar // leer el marco de pila actual, instrucciones JMP/CALL, etc. (Los detalles están más allá de este alcance). currentThread -> registers = getAllRegisters (); // Almacenar los registros en el objeto "currentThread" en la memoria. currentThread -> registers . PC = resume ; // Establecer el próximo PC en la etiqueta "resume" a continuación en este método. // A diferencia de contextSwitchISR(), no colocaremos currentThread nuevamente en readyQueue. // En cambio, ya se ha colocado en la cola de una variable de condición o de un mutex. Thread * otherThread = readyQueue . dequeue (); // Eliminar y obtener el próximo hilo que se ejecutará desde la cola de hilos listos. currentThread = otherThread ; // Reemplazar el valor del puntero global del hilo actual para que esté listo para el próximo hilo. // Restaurar los registros de currentThread/otherThread, incluido un salto a la PC almacenada del otro hilo // (en "resume" a continuación). Nuevamente, los detalles de cómo se hace esto están más allá de este alcance. restoreRegisters ( otherThread.registers ); // *** ¡Ahora se está ejecutando "otherThread" (que ahora es "currentThread")! El hilo original ahora está "inactivo". *** resume : // Aquí es donde otra llamada contextSwitch() debe establecer la PC al cambiar el contexto nuevamente aquí. // Regresa al punto donde lo dejó otherThread. }public method wait ( Mutex m , ConditionVariable c ) { // Bloqueo de giro interno mientras otros subprocesos en cualquier núcleo acceden a este objeto // "held" y "threadQueue", o "readyQueue". while ( testAndSet ( threadingSystemBusy )) {} // NB: "threadingSystemBusy" ahora es verdadero. // Llamada del sistema para deshabilitar interrupciones en este núcleo para que threadSleep() no sea interrumpido por // el temporizador de cambio de subproceso en este núcleo que llamaría contextSwitchISR(). // Hecho fuera de threadSleep() para mayor eficiencia para que este subproceso se duerma // justo después de ir a la cola de variables de condición. systemCall_disableInterrupts (); assert m . held ; // (Específicamente, este subproceso debe ser el que lo retiene). m . release (); c . waitingThreads . enqueue ( currentThread ); threadSleep (); // El hilo está inactivo... El hilo se despierta a partir de una señal/difusión. threadingSystemBusy = false ; // Debe ser una asignación atómica. systemCall_enableInterrupts (); // Vuelve a activar la conmutación preventiva en este núcleo. // Estilo Mesa: // Ahora pueden producirse cambios de contexto aquí, lo que hace que el predicado del llamador del cliente sea falso. m . acquire (); } public method signal ( ConditionVariable c ) { // Bloqueo de giro interno mientras otros subprocesos en cualquier núcleo acceden a este objeto // "held" y "threadQueue", o "readyQueue". while ( testAndSet ( threadingSystemBusy )) {} // NB: "threadingSystemBusy" ahora es verdadero. // Llamada del sistema para deshabilitar interrupciones en este núcleo para que threadSleep() no sea interrumpido por // el temporizador de cambio de subproceso en este núcleo que llamaría contextSwitchISR(). // Hecho fuera de threadSleep() para mayor eficiencia para que este subproceso se duerma // justo después de ir a la cola de variables de condición. systemCall_disableInterrupts (); if ( ! c . waitingThreads . isEmpty ()) { wakenThread = c . waitingThreads . dequeue (); readyQueue . enqueue ( wakenThread ); } threadingSystemBusy = false ; // Debe ser una asignación atómica. systemCall_enableInterrupts (); // Vuelva a activar la conmutación preventiva en este núcleo. // Estilo Mesa: // El hilo despertado no tiene ninguna prioridad. } public method broadcast ( ConditionVariable c ) { // Bloqueo de giro interno mientras otros subprocesos en cualquier núcleo acceden a este objeto // "held" y "threadQueue", o "readyQueue". while ( testAndSet ( threadingSystemBusy )) {} // NB: "threadingSystemBusy" ahora es verdadero. // Llamada del sistema para deshabilitar interrupciones en este núcleo para que threadSleep() no sea interrumpido por // el temporizador de cambio de subproceso en este núcleo que llamaría contextSwitchISR(). // Hecho fuera de threadSleep () para mayor eficiencia para que este subproceso se duerma // justo después de ir a la cola de variables de condición . systemCall_disableInterrupts ( ) ; while ( ! c.waitingThreads.isEmpty ( ) ) { wakenThread = c.waitingThreads.dequeue ( ) ; readyQueue.enqueue ( wakenThread ) ; } threadingSystemBusy = false ; // Debe ser una asignación atómica. systemCall_enableInterrupts (); // Activa nuevamente la conmutación preventiva en este núcleo. // Estilo Mesa: // Los hilos activados no reciben ninguna prioridad. } clase Mutex { protected volátil bool retenido = falso ; privado volátil ThreadQueue blockingThreads ; // Cola no segura para subprocesos de subprocesos bloqueados. Los elementos son (Thread*). método público adquirir () { // Bloqueo de giro interno mientras otros subprocesos en cualquier núcleo acceden a este objeto // "retenido" y "threadQueue", o "readyQueue". while ( testAndSet ( threadingSystemBusy )) {} // NB: "threadingSystemBusy" ahora es verdadero. // Llamada del sistema para deshabilitar interrupciones en este núcleo para que threadSleep() no sea interrumpido por // el temporizador de cambio de subproceso en este núcleo que llamaría contextSwitchISR(). // Hecho fuera de threadSleep() para mayor eficiencia para que este subproceso se duerma // justo después de ir a la cola de bloqueo. systemCall_disableInterrupts (); afirmar ! blockingThreads . contiene ( currentThread ); if ( held ) { // Coloque "currentThread" en la cola de este bloqueo para que se considere "durmiendo" en este bloqueo. // Tenga en cuenta que "currentThread" aún necesita ser manejado por threadSleep(). readyQueue . remove ( currentThread ); blockingThreads . enqueue ( currentThread ); threadSleep (); // Ahora estamos despertados, lo que debe ser porque "held" se volvió falso. assert ! held ; assert ! blockingThreads . contains ( currentThread ); } held = true ; threadingSystemBusy = false ; // Debe ser una asignación atómica. systemCall_enableInterrupts (); // Vuelva a activar la conmutación preventiva en este núcleo. } public method release () { // Bloqueo de giro interno mientras otros subprocesos en cualquier núcleo acceden a "held" y "threadQueue" de este objeto , o "readyQueue". while ( testAndSet ( threadingSystemBusy )) {} // NB: "threadingSystemBusy" ahora es verdadero. // Llamada del sistema para deshabilitar las interrupciones en este núcleo para mayor eficiencia. systemCall_disableInterrupts (); assert held ; // (La liberación solo debe realizarse mientras el bloqueo esté mantenido). held = false ; if ( ! blockingThreads.isEmpty ( )) { Thread * unblockedThread = blockingThreads.dequeue ( ) ; readyQueue.enqueue ( unblockedThread ) ; } threadingSystemBusy = false ; // Debe ser una asignación atómica. systemCall_enableInterrupts ( ); // Vuelva a activar la conmutación preventiva en este núcleo. } } estructura VariableCondición { volátil ThreadQueue esperandoHilos ; }
Las propuestas originales de CAR Hoare y Per Brinch Hansen eran para bloquear variables de condición . Con una variable de condición de bloqueo, el hilo de señalización debe esperar fuera del monitor (al menos) hasta que el hilo al que se le ha enviado la señal ceda la ocupación del monitor, ya sea regresando o esperando nuevamente en una variable de condición. Los monitores que utilizan variables de condición de bloqueo a menudo se denominan monitores de estilo Hoare o monitores de señal y espera urgente .
Suponemos que hay dos colas de subprocesos asociados con cada objeto de monitor
e
¿Es la cola de entrada?s
es una cola de subprocesos que han enviado señales.Además, suponemos que para cada variable de condición c , hay una cola
c.q
, que es una cola para subprocesos que esperan la condición variable cPor lo general, se garantiza que todas las colas sean justas y, en algunas implementaciones, se puede garantizar que sean las primeras en entrar, las primeras en salir .
La implementación de cada operación es la siguiente. (Suponemos que cada operación se ejecuta en exclusión mutua con las demás; por lo tanto, los subprocesos reiniciados no comienzan a ejecutarse hasta que se completa la operación).
entrar al monitor: Introduzca el método Si el monitor está bloqueado Añade este hilo a e bloquear este hilo demás bloquear el monitordejar el monitor: cronograma regresar del métodoespera c :Añade este hilo a c.q cronograma bloquear este hiloseñal c : si hay un hilo esperando en c.q Seleccione y elimine uno de esos hilos t de c .q (se llama "el hilo señalado") Añade este hilo a s reiniciar t (así que ocupará el monitor a continuación) bloquear este hilocronograma: Si hay un hilo sobre s Seleccione y elimine un hilo de s y reinícielo (este hilo ocupará el monitor a continuación) De lo contrario, si hay un hilo sobre e Seleccione y elimine un hilo de e y reinícielo (este hilo ocupará el monitor a continuación) demás Desbloquear el monitor (el monitor quedará desocupado)
La schedule
rutina selecciona el siguiente hilo que ocupará el monitor o, en ausencia de hilos candidatos, desbloquea el monitor.
La disciplina de señalización resultante se conoce como "señalización y espera urgente", ya que el emisor de señales debe esperar, pero se le da prioridad sobre los subprocesos en la cola de entrada. Una alternativa es "señalización y espera", en la que no hay s
cola y el emisor de señales espera en la e
cola.
Algunas implementaciones proporcionan una operación de señal y retorno que combina la señalización con el retorno de un procedimiento.
señal c y retorno : si hay un hilo esperando en c .q Seleccione y elimine uno de esos hilos t de c .q (se llama "el hilo señalado") reiniciar t (así que ocupará el monitor a continuación) demás cronograma regresar del método
En cualquier caso ("señal y espera urgente" o "señal y espera"), cuando se envía una señal a una variable de condición y hay al menos un subproceso esperando en la variable de condición, el subproceso de señalización entrega la ocupación al subproceso enviado la señal sin problemas, de modo que ningún otro subproceso pueda obtener la ocupación en el medio. Si P c es verdadero al comienzo de cada operación de señal c , será verdadero al final de cada operación de espera c . Esto se resume en los siguientes contratos . En estos contratos, I es el invariante del monitor .
entrar al monitor: postcondición Idejar el monitor: condición previa Iesperar c : la precondición I modifica el estado del monitor, la poscondición P c y Iseñal c : condición previa P c e I modifica el estado del monitor condición posterior Iseñal c y retorno : condición previa P c e I
En estos contratos, se supone que I y P c no dependen del contenido o la longitud de ninguna cola.
(Cuando se puede consultar la variable de condición en cuanto a la cantidad de subprocesos que esperan en su cola, se pueden proporcionar contratos más sofisticados. Por ejemplo, un par de contratos útil, que permite pasar la ocupación sin establecer la invariante, es:
esperar c : condición previa I modifica el estado del monitor condición posterior P c La precondición de la señal c ( no vacía( c ) y P c ) o (vacía( c ) y I ) modifica el estado de la poscondición del monitor I
(Véase Howard [4] y Buhr et al. [5] para más información).
Es importante señalar aquí que la afirmación P c depende enteramente del programador; él o ella simplemente debe ser coherente acerca de lo que es.
Concluimos esta sección con un ejemplo de una clase segura para subprocesos que utiliza un monitor de bloqueo que implementa una pila limitada y segura para subprocesos .
monitor class SharedStack { const privada capacidad := 10 int privado [capacidad] Un int privado tamaño := 0 invariante 0 <= tamaño y tamaño <= capacidad private BlockingCondition theStackIsNotEmpty /* asociado con 0 < tamaño y tamaño <= capacidad */ private BlockingCondition theStackIsNotFull /* asociado con 0 <= tamaño y tamaño < capacidad */ método público push( int valor) { si tamaño = capacidad entonces esperar theStackIsNotFull afirma 0 <= tamaño y tamaño < capacidad A[tamaño] := valor ; tamaño := tamaño + 1 afirmar 0 < tamaño y tamaño <= capacidad señala theStackIsNotEmpty y retorna } método público int pop() { si tamaño = 0 entonces espera theStackIsNotEmpty afirma 0 < tamaño y tamaño <= capacidad tamaño := tamaño - 1 ; afirmar 0 <= tamaño y tamaño < capacidad señalan theStackIsNotFull y devuelven A[tamaño] }}
Tenga en cuenta que, en este ejemplo, la pila segura para subprocesos proporciona internamente un mutex que, como en el ejemplo anterior de productor/consumidor, es compartido por ambas variables de condición, que están verificando diferentes condiciones en los mismos datos concurrentes. La única diferencia es que el ejemplo de productor/consumidor asumió una cola regular no segura para subprocesos y estaba usando un mutex independiente y variables de condición, sin abstraer estos detalles del monitor como es el caso aquí. En este ejemplo, cuando se llama a la operación "esperar", se debe proporcionar de alguna manera con el mutex de la pila segura para subprocesos, como si la operación "esperar" fuera una parte integrada de la "clase de monitor". Aparte de este tipo de funcionalidad abstraída, cuando se usa un monitor "sin procesar", siempre tendrá que incluir un mutex y una variable de condición, con un mutex único para cada variable de condición.
Con variables de condición no bloqueantes (también llamadas variables de condición de "estilo Mesa" o variables de condición de "señal y continuación" ), la señalización no hace que el subproceso de señalización pierda la ocupación del monitor. En cambio, los subprocesos señalizados se mueven a la e
cola. No hay necesidad de s
cola.
En el caso de las variables de condición no bloqueantes, la operación de señal suele denominarse notificación (terminología que seguiremos aquí). También es habitual proporcionar una operación de notificación a todos que mueve todos los subprocesos que esperan una variable de condición a la e
cola.
Aquí se da el significado de varias operaciones. (Suponemos que cada operación se ejecuta en exclusión mutua con las demás; por lo tanto, los subprocesos reiniciados no comienzan a ejecutarse hasta que se completa la operación).
entrar al monitor: Introduzca el método Si el monitor está bloqueado Añade este hilo a e bloquear este hilo demás bloquear el monitordejar el monitor: cronograma regresar del métodoespera c :Añade este hilo a c.q cronograma bloquear este hilonotificar c :Si hay un hilo esperando en c.q seleccionar y eliminar un hilo t de c .q (se llama "el hilo notificado") mover t a enotificar a todos c : mover todos los hilos que esperan en c .q a ecronograma : Si hay un hilo sobre e Seleccione y elimine un hilo de e y reinícielo demás Desbloquear el monitor
Como una variación de este esquema, el hilo notificado puede ser movido a una cola llamada w
, que tiene prioridad sobre e
. Consulte Howard [4] y Buhr et al. [5] para obtener más información.
Es posible asociar una afirmación P c con cada variable de condición c de modo que P c sea seguro que sea verdadero al regresar de . Sin embargo, uno debe asegurarse de que P c se conserve desde el momento en que el hilo notificador abandona la ocupación hasta que el hilo notificado es seleccionado para volver a ingresar al monitor. Entre estos momentos podría haber actividad por parte de otros ocupantes. Por lo tanto, es común que P c simplemente sea verdadero .wait c
Por esta razón, normalmente es necesario encerrar cada operación de espera en un bucle como este
mientras no ( P ) esperar c
donde P es alguna condición más fuerte que P c . Las operaciones y se tratan como "pistas" de que P puede ser verdadera para algún hilo en espera. Cada iteración de un bucle de este tipo después de la primera representa una notificación perdida; por lo tanto, con monitores no bloqueantes, uno debe tener cuidado para asegurarse de que no se pierdan demasiadas notificaciones.notify c
notify all c
Como ejemplo de "insinuación", considere una cuenta bancaria en la que un hilo de retiro esperará hasta que la cuenta tenga fondos suficientes antes de continuar.
clase de monitor Cuenta { int privado saldo := 0 invariante saldo >= 0 condición privada sin bloqueo saldoPuedeSerLoSuficientementeGrande método público retirar( int cantidad) condición previa cantidad >= 0 { mientras saldo < cantidad esperar saldoPuedeSerLoSuficientementeGrande afirmar saldo > = cantidad saldo := saldo - cantidad } método público deposit( int cantidad) condición previa cantidad >= 0 { saldo := saldo + importe notificar a todos los balancesMayBeBigEnough }}
En este ejemplo, la condición que se espera es una función del monto que se debe retirar, por lo que es imposible que un hilo que realiza un depósito sepa que ha cumplido esa condición. En este caso, tiene sentido permitir que cada hilo que espera ingrese al monitor (uno a la vez) para verificar si su afirmación es verdadera.
En el lenguaje Java , cada objeto puede utilizarse como monitor. Los métodos que requieren exclusión mutua deben marcarse explícitamente con la palabra clave sincronizada . Los bloques de código también pueden marcarse con la palabra clave sincronizada . [6]
En lugar de tener variables de condición explícitas, cada monitor (es decir, objeto) está equipado con una única cola de espera además de su cola de entrada. Toda la espera se realiza en esta única cola de espera y todas las operaciones de notificación y notificación a todas se aplican a esta cola. [7] Este enfoque se ha adoptado en otros lenguajes, por ejemplo C# .
Otro enfoque para la señalización es omitir la operación de señal . Siempre que un subproceso abandona el monitor (regresando o esperando), se evalúan las afirmaciones de todos los subprocesos en espera hasta que se encuentra que una es verdadera. En un sistema de este tipo, no se necesitan variables de condición, pero las afirmaciones deben estar codificadas explícitamente. El contrato para esperar es
esperar P : la precondición I modifica el estado del monitor, la poscondición P y la I
Brinch Hansen y Hoare desarrollaron el concepto de monitor a principios de la década de 1970, basándose en ideas anteriores propias y de Edsger Dijkstra . [8] Brinch Hansen publicó la primera notación de monitor, adoptando el concepto de clase de Simula 67 , [1] e inventó un mecanismo de cola. [9] Hoare refinó las reglas de reanudación de procesos. [2] Brinch Hansen creó la primera implementación de monitores, en Concurrent Pascal . [8] Hoare demostró su equivalencia con los semáforos .
Los monitores (y Concurrent Pascal) pronto se utilizaron para estructurar la sincronización de procesos en el sistema operativo Solo. [10] [11]
Los lenguajes de programación que admiten monitores incluyen:
Se han escrito varias bibliotecas que permiten construir monitores en lenguajes que no los admiten de forma nativa. Cuando se utilizan llamadas a bibliotecas, el programador debe marcar explícitamente el inicio y el final del código ejecutado con exclusión mutua. Pthreads es una de esas bibliotecas.