En informática , el problema productor-consumidor (también conocido como el problema del buffer acotado ) es una familia de problemas descritos por Edsger W. Dijkstra desde 1965.
Dijkstra encontró la solución al problema productor-consumidor mientras trabajaba como consultor para las computadoras Electrologica X1 y X8: "El primer uso del productor-consumidor fue en parte software, en parte hardware: el componente que se encargaba del transporte de información entre la tienda y el periférico se llamaba 'canal' ... La sincronización se controlaba mediante dos semáforos de conteo en lo que ahora conocemos como la disposición productor/consumidor: el semáforo que indicaba la longitud de la cola, se incrementaba (en una V) por la CPU y se decrementaba (en una P) por el canal, el otro, que contaba el número de finalizaciones no reconocidas, se incrementaba por el canal y se decrementaba por la CPU. [El segundo semáforo, al ser positivo, activaba la bandera de interrupción correspondiente.]" [1]
Dijkstra escribió sobre el caso del buffer ilimitado: "Consideramos dos procesos, que se denominan respectivamente 'productor' y 'consumidor'. El productor es un proceso cíclico y cada vez que pasa por su ciclo produce una cierta porción de información, que tiene que ser procesada por el consumidor. El consumidor también es un proceso cíclico y cada vez que pasa por su ciclo, puede procesar la siguiente porción de información, tal como ha sido producida por el productor... Suponemos que los dos procesos están conectados para este propósito a través de un buffer con capacidad ilimitada". [2]
Escribió sobre el caso del buffer acotado: "Hemos estudiado un productor y un consumidor acoplados a través de un buffer con capacidad ilimitada... La relación se vuelve simétrica, si los dos están acoplados a través de un buffer de tamaño finito, digamos N porciones" [3].
Y sobre el caso de múltiples productores-consumidores: “Consideramos una cantidad de pares productor/consumidor, donde el par i está acoplado a través de un flujo de información que contiene n i porciones. Suponemos que… el buffer finito que debería contener todas las porciones de todos los flujos tiene una capacidad de 'tot' porciones”. [4]
Per Brinch Hansen y Niklaus Wirth se dieron cuenta rápidamente del problema de los semáforos: "He llegado a la misma conclusión con respecto a los semáforos, es decir, que no son adecuados para lenguajes de alto nivel. En cambio, los eventos de sincronización naturales son intercambios de mensajes ". [5]
La solución original de buffer delimitado por semáforo se escribió en estilo ALGOL . El buffer puede almacenar N porciones o elementos. El semáforo de "número de porciones en cola" cuenta las ubicaciones llenas en el buffer, el semáforo de "número de posiciones vacías" cuenta las ubicaciones vacías en el buffer y el semáforo de "manipulación de buffer" funciona como mutex para las operaciones de poner y obtener el buffer. Si el buffer está lleno, es decir, el número de posiciones vacías es cero, el hilo productor esperará en la operación P(número de posiciones vacías). Si el buffer está vacío, es decir, el número de porciones en cola es cero, el hilo consumidor esperará en la operación P(número de porciones en cola). Las operaciones V() liberan los semáforos. Como efecto secundario, un hilo puede moverse de la cola de espera a la cola de listos. La operación P() disminuye el valor del semáforo a cero. La operación V() aumenta el valor del semáforo. [6]
begin número entero de porciones en cola , número de posiciones vacías , manipulación de buffer ; número de porciones en cola := 0 ; número de posiciones vacías := N ; manipulación de buffer := 1 ; parbegin productor : empezar de nuevo 1 : producir la siguiente porción ; P ( número de posiciones vacías ) ; P ( manipulación de buffer ) ; añadir porción al buffer ; V ( manipulación de buffer ) ; V ( número de porciones en cola ) ; ir de nuevo 1 fin ; consumidor : empezar de nuevo 2 : P ( número de porciones en cola ) ; P ( manipulación de buffer ) ; tomar porción del buffer ; V ( manipulación de buffer ) ; V ( número de posiciones vacías ) ; procesar porción tomada ; ir de nuevo 2 fin parend fin
A partir de C++ 20, los semáforos forman parte del lenguaje. La solución de Dijkstra se puede escribir fácilmente en C++ moderno. La variable buffer_manipulation es un mutex. La característica del semáforo de adquirir en un hilo y liberar en otro no es necesaria. La declaración lock_guard() en lugar de un par lock() y unlock() es C++ RAII . El destructor lock_guard garantiza la liberación del bloqueo en caso de una excepción. Esta solución puede manejar múltiples hilos consumidores y/o múltiples hilos productores.
#include <hilo> #include <mutex> #include <semáforo> std :: semáforo_de_conteo < N > número_de_porciones_en_cola { 0 }; std :: semáforo_de_conteo < N > número_de_posiciones_vacías { N } ; std :: manipulación_de_búfer_de_mutex ; void productor () { for (;;) { Porción porción = producir_siguiente_porción (); número_de_posiciones_vacías.adquirir ( ); { std :: lock_guard < std :: mutex > g ( manipulación_de_buffer ) ; agregar_porción_al_buffer ( porción ); } número_de_porciones_en_cola.liberar ( ) ; } } void consumidor () { para (;;) { numero_de_porciones_en_cola.adquirir (); Porción porción ; { std :: lock_guard < std :: mutex > g ( manipulación_de_buffer ) ; porción = tomar_porción_del_buffer ( ); } numero_de_posiciones_vacías.liberar ( ); procesar_porción_tomada ( porción ) ; } } int main () { std :: hilo t1 ( productor ) ; std :: hilo t2 ( consumidor ); t1.join ( ); t2.join ( ) ; }
Per Brinch Hansen definió el monitor: Utilizaré el término monitor para denotar una variable compartida y el conjunto de operaciones significativas sobre ella. El propósito de un monitor es controlar la programación de recursos entre procesos individuales de acuerdo con una determinada política. [7] Tony Hoare sentó las bases teóricas para el monitor. [8]
buffer acotado : monitor inicio buffer : matriz 0 .. N - 1 de porción ; cabeza , cola : 0 .. N - 1 ; conteo : 0 .. N ; no vacío , no lleno : condición ; procedimiento append ( x : porción ) ; comienzo si conteo = N entonces no lleno . esperar ; nota 0 <= conteo < N ; buffer [ cola ] := x ; cola := cola ( + ) 1 ; conteo := conteo + 1 ; no vacío . señal fin append ; procedimiento remove ( resultado x : porción ) ; comienzo si conteo = 0 entonces no vacío . esperar ; nota 0 < conteo <= N ; x := buffer [ cabeza ] ; cabeza := cabeza ( + ) 1 ; conteo := conteo - 1 ; no lleno . señal fin remove ; cabeza := 0 ; cola := 0 ; count := 0 ; fin del búfer acotado ;
El monitor es un objeto que contiene las variables buffer
, head
, tail
y count
para realizar un buffer circular , las variables de condición nonempty
y nonfull
para la sincronización y los métodos append
y remove
para acceder al buffer acotado. La operación del monitor esperar corresponde a la operación del semáforo P o adquirir, señal corresponde a V o liberar. Las operaciones en círculo (+) se toman módulo N. El pseudocódigo de estilo Pascal presentado muestra un monitor Hoare. Un monitor Mesa utiliza while count
en lugar de if count
. Una versión del lenguaje de programación C++ es:
clase Bounded_buffer { Porción buffer [ N ]; // 0..N-1 cabeza sin signo , cola ; // 0..N-1 recuento sin signo ; // 0..N std :: variable_de_condición no vacío , no lleno ; std :: mutex mtx ; público : void append ( Porción x ) { std :: unique_lock < std :: mutex > lck ( mtx ); no lleno .wait ( lck , [ & ] { return ! ( N == recuento ); }); assert ( 0 <= recuento && recuento < N ); buffer [ cola ++ ] = x ; cola %= N ; ++ recuento ; no vacío .notify_one (); } Porción remove () { std :: unique_lock < std :: mutex > lck ( mtx ); no vacío . esperar ( lck , [ & ]{ return ! ( 0 == count ); }); assert ( 0 < count && count <= N ); Porción x = buffer [ head ++ ]; head %= N ; -- count ; no lleno . notify_one (); return x ; } Bounded_buffer () { head = 0 ; tail = 0 ; count = 0 ; } };
La versión C++ necesita un mutex adicional por razones técnicas. Utiliza assert para hacer cumplir las condiciones previas para las operaciones de agregar y quitar búfer.
La primera solución productor-consumidor en las computadoras Electrologica usaba "canales". Hoare definió los canales : Una alternativa a la denominación explícita de origen y destino sería nombrar un puerto a través del cual se realizará la comunicación. Los nombres de los puertos serían locales para los procesos, y la manera en que los pares de puertos se conectarán mediante canales podría declararse en la cabecera de un comando paralelo. [9] Brinch Hansen implementó canales en los lenguajes de programación Joyce y Super Pascal . El lenguaje de programación del sistema operativo Plan 9 Alef y el lenguaje de programación del sistema operativo Inferno Limbo tienen canales. El siguiente código fuente C se compila en Plan 9 desde User Space :
#incluir "uh" #incluir "libc.h" #incluir "thread.h" enumeración { PILA = 8192 }; void productor ( void * v ) { Canal * ch = v ; para ( uint i = 1 ; ; ++ i ) { dormir ( 400 ); imprimir ( "p %d \n " , i ); sendul ( ch , i ); } } void consumidor ( void * v ) { Canal * ch = v ; para (;;) { uint p = recvul ( ch ); imprimir ( " \t\t c %d \n " , p ); dormir ( 200 + nrand ( 600 )); } } void threadmain ( int argc , char ** argv ) { int ( * mk ) ( void ( * fn ) ( void * ), void * arg , uint pila ); mk = threadcreate ; Canal * ch = chancreate ( sizeof ( ulong ), 1 ); mk ( productor , ch , PILA ); mk ( consumidor , ch , PILA ); recvp ( chancreate ( sizeof ( void * ), 0 )); threadexitsall ( 0 ); }
El punto de entrada del programa está en function threadmain
. La llamada a la función ch = chancreate(sizeof(ulong), 1)
crea el canal, la llamada a la función sendul(ch, i)
envía un valor al canal y la llamada a la función p = recvul(ch)
recibe un valor del canal. El lenguaje de programación Go también tiene canales. Un ejemplo de Go:
paquete principal importar ( "fmt" "math/rand" "tiempo" ) var enviarMsg = 0 func produceMessage ( ) int { tiempo.Sleep ( 400 * tiempo.Milisegundo ) sendMsg ++ fmt.Printf ( " sendMsg = %v\n" , sendMsg ) return sendMsg } func consumeMessage ( recvMsg int ) { fmt.Printf ( " \t\trecvMsg = %v \ n " , recvMsg ) tiempo.Sleep ( tiempo.Duration ( 200 + rand.Intn ( 600 ) ) * tiempo.Milisegundo ) } func main ( ) { ch : = make ( chan int , 3 ) go func ( ) { for { ch < - produceMessage ( ) } } ( ) for recvMsg : = range ch { consumeMessage ( recvMsg ) } }
La solución de productor-consumidor de Go utiliza la rutina principal de Go para el consumidor y crea una nueva rutina de Go sin nombre para el productor. Las dos rutinas de Go están conectadas con el canal ch. Este canal puede poner en cola hasta tres valores int. La instrucción ch := make(chan int, 3)
crea el canal, la instrucción ch <- produceMessage()
envía un valor al canal y la instrucción recvMsg := range ch
recibe un valor del canal. [10] La asignación de recursos de memoria, la asignación de recursos de procesamiento y la sincronización de recursos las realiza automáticamente el lenguaje de programación.
Leslie Lamport documentó una solución de productor-consumidor de buffer acotado para un productor y un consumidor: Suponemos que el buffer puede contener como máximo b mensajes, b >= 1. En nuestra solución, dejamos que k sea una constante mayor que b, y que s y r sean variables enteras que asumen valores entre 0 y k-1. Suponemos que inicialmente s=r y el buffer está vacío. Al elegir que k sea un múltiplo de b, el buffer se puede implementar como una matriz B [0: b - 1]. El productor simplemente pone cada nuevo mensaje en B[s mod b], y el consumidor toma cada mensaje de B[r mod b]. [11] El algoritmo se muestra a continuación, generalizado para k infinito.
Productor : L : si ( s - r ) mod k = b entonces goto L fi ; poner el mensaje en el buffer ; s := ( s + 1 ) mod k ; goto L ; Consumidor : L : si ( s - r ) mod k = 0 entonces goto L fi ; tomar el mensaje del buffer ; r := ( r + 1 ) mod k ; goto L ;
La solución de Lamport utiliza la espera activa en el subproceso en lugar de esperar en el programador. Esta solución ignora el impacto del cambio de subproceso del programador en momentos inconvenientes. Si el primer subproceso ha leído un valor de variable de la memoria, el programador cambia al segundo subproceso que cambia el valor de la variable y el programador vuelve al primer subproceso, entonces el primer subproceso utiliza el valor anterior de la variable, no el valor actual. La lectura-modificación-escritura atómica resuelve este problema. El C++ moderno ofrece atomic
variables y operaciones para la programación multiproceso. La siguiente solución de espera activa de C++11 para un productor y un consumidor utiliza operaciones atómicas de lectura-modificación-escritura fetch_add
y fetch_sub
en la variable atómica count
.
enum { N = 4 }; Buffer de mensajes [ N ]; std :: atomic < unsigned > count { 0 }; void productor () { unsigned tail { 0 }; for (;;) { Mensaje mensaje = produceMessage (); while ( N == count ) ; // ocupado esperando buffer [ tail ++ ] = mensaje ; tail %= N ; count . fetch_add ( 1 , std :: memory_order_relaxed ); } } void consumidor () { unsigned head { 0 }; for (;;) { while ( 0 == count ) ; // ocupado esperando Mensaje mensaje = buffer [ head ++ ]; head %= N ; count . fetch_sub ( 1 , std :: memory_order_relaxed ); consumeMessage ( mensaje ); } } int main () { std :: hilo t1 ( productor ); std :: hilo t2 ( consumidor ); t1 . unirse (); t2 . unirse (); }
Las variables de índice de búfer circular son locales del subproceso y, por lo tanto, no son relevantes para la consistencia de la memoria. La variable head
controla la espera activa del subproceso productor y consumidor.tail
count