Introducción a la programación en paralelo con Python

En esta entrada, te presentamos una introducción a cómo usar Python para
programar en paralelo.

Para ello, usaremos la librería concurrent.futures que viene incluida en la distribución estándar de Python.

La programación en paralelo consiste en ejecutar varias tareas al mismo
tiempo usando diferentes procesadores de nuestra máquina. Esto es útil
cuando tenemos que realizar tareas que son independientes entre sí, como
por ejemplo, descargar varias imágenes de internet, o procesar varios
archivos de texto. De esta manera podemos reducir el tiempo de ejecución
de nuestro programa.

Para poder ejecutar tareas en paralelo, necesitamos tener más de un procesador en nuestra máquina. En este caso, usaremos un procesador con 16 núcleos. Si no sabes cuántos núcleos tiene tu procesador, puedes usar la función cpu_count() de la librería multiprocessing para averiguarlo:

In[ ]:
from multiprocessing import cpu_count

cpu_count()
Out[ ]:
16

Importante: El código presentado en esta entrada está pensado para ser utilizado en un Jupyter Notebook, sin embargo, el uso de multiprosesadores en Jupyter Notebook es compatible únicamente con Linux. Si deseas emplear este código en Windows, debes hacerlo mediante WLS (Windows Subsystem for Linux); puedes encontrar una guía de cómo abrir un Jupyter Notebook en WLS aquí o en este video.

Para ilustrar el procedimiento, vamos a usar un ejemplo sencillo de tareas independientes que suelen requerir algo de tiempo de ejecución: comprobar que un número de varios dígitos es primo. Para esto, definimos una función que comprueba si un número es primo o no:

In[ ]:
from math import sqrt, floor

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(floor(sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

Además, definamos una lista de números primos para probar nuestro código:

In[ ]:
numbers = [
    8993096507401177, 2908593418964293, 7777345695645151,
    7926824204432719, 8095314844630399, 8879539710751067,
    9248541259623923, 1655261748084071, 4943779600461619,
    7556956264736881, 1037564015191141, 7343460960245491,
    5896858405913831, 8070657685273853, 6155674353544883]

Iniciemos visualizando el tiempo que tarda en comprobar si cada número de la lista es primo:

In[ ]:
from time import time

start = time()
print(is_prime(numbers[0]))
end = time()
print(end - start, 'segundos')
Out[ ]:
True
1.965362310409546 segundos

Formas de usar multiprocesadores

Vamos a revisar tres formas de utilizar multiprocesadores con la librería concurrent.futures: ejecutando varias funciones independientes, ejecutando una misma función varias veces mediante un bucle y ejecutando una función mediante un mapeo a una lista.

Ejecutando varias funciones independientes

Supongamos que queremos ejecutar cuatro tareas de manera secuencial e ir guardando los resultados de las mismas en una lista. Esto lo realizaríamos de la siguiente manera:

In[ ]:
results = []

start = time()
results.append(is_prime(numbers[0]))
results.append(is_prime(numbers[1]))
results.append(is_prime(numbers[2]))
results.append(is_prime(numbers[3]))
end = time()

print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True]
6.45605731010437 segundos

Para realizar esta misma acción usando multiprocesadores, necesitamos la función ProcessPoolExecutor de la librería concurrent.futures. Esta nos permitirá generar un
pool de procesos que se ejecutarán de manera paralela.

Primero, debemos definir una lista donde guardaremos las tareas que deseamos ejecutar, a esta la llamaremos futures. Luego de esto, definiremos el pool con la función ProcessPoolExecutor, al cual llamaremos executor; en esta función podemos indicar el número de procesadores que queremos que se usen, en este caso usaremos 2. Además, dado que los procesos se ejecutan de manera paralela, unos procesos terminarán antes que otros, por lo tanto, debemos esperar que todos los procesos finalicen para poder obtener los resultados, para esto usamos la función wait.

In[ ]:
from concurrent.futures import ProcessPoolExecutor, wait

futures = []

with ProcessPoolExecutor(2) as executor:
    start = time()
    futures.append(executor.submit(is_prime, numbers[0]))
    futures.append(executor.submit(is_prime, numbers[1]))
    futures.append(executor.submit(is_prime, numbers[2]))
    futures.append(executor.submit(is_prime, numbers[3]))

    wait(futures)

    results = [future.result() for future in futures]
    end = time()

print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True]
3.9698755741119385 segundos

Como vemos, al usar 2 procesadores, el tiempo de ejecución se reduce aproximadamente a la mitad.

Otra opción, más común, es no esperar a que todos los procesos finalices, sino ir obteniendo los resultados a medida que estos se van generando. Para esto, usamos la función as_completed, la cual nos permite obtener los resultados de los procesos a medida que estos se van completando.

In[ ]:
from concurrent.futures import ProcessPoolExecutor, as_completed

futures = []

with ProcessPoolExecutor(2) as executor:
    start = time()
    futures.append(executor.submit(is_prime, numbers[0]))
    futures.append(executor.submit(is_prime, numbers[1]))
    futures.append(executor.submit(is_prime, numbers[2]))
    futures.append(executor.submit(is_prime, numbers[3]))

    results = [future.result() for future in as_completed(futures)]
    end = time()

print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True]
3.9460644721984863 segundos

Ejecutando una misma función varias veces mediante un bucle

Supongamos que queremos ejecutar la misma función varias veces, pero con diferentes argumentos. Esto lo realizaríamos utilizando un bucle, en el caso secuencial sería de la siguiente manera:

In[ ]:
results = []

start = time()
for number in numbers:
    results.append(is_prime(number))
end = time()

print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
23.835153818130493 segundos

Para realizar esta misma acción usando multiprocesadores, tenemos el siguiente código, en este caso, utilizaremos 16 procesadores:

In[ ]:
results = []
futures = []

with ProcessPoolExecutor(16) as executor:
    start = time()
    for number in numbers:
        futures.append(executor.submit(is_prime, number))
    
    for result in as_completed(futures):
        results.append(result.result())
    end = time()
    
print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
3.550208568572998 segundos

Ejecutando una función mediante un mapeo a una lista

Supongamos que queremos ejecutar la misma función sobre todos los elementos de una lista. Esto lo realizaríamos utilizando un mapeo, en el caso secuencial sería de la siguiente manera:

In[ ]:
start = time()
results = list(map(is_prime, numbers))
end = time()

print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
24.3707332611084 segundos

Para realizar esta misma acción usando multiprocesadores, tenemos el siguiente código, como podemos ver, es la forma más compacta de realizar la tarea, además, si no colocamos ningún número de procesadores, se usarán todos los procesadores disponibles en nuestra máquina:

In[ ]:
with ProcessPoolExecutor() as executor:
    start = time()
    results = list(executor.map(is_prime, numbers))
    end = time()

print(results)
print(end - start, 'segundos')
Out[ ]:
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
3.5317625999450684 segundos

Análisis de resultados

Para finalizar, veamos cómo afecta el uso de procesadores al tiempo de ejecución de nuestro código. Para esto, definamos la siguiente función cuyo argumento es el número de procesadores a utilizarse:

In[ ]:
def test_primes(n_proc):
    with ProcessPoolExecutor(n_proc) as executor:
        results = list(executor.map(is_prime, numbers))
    return results

Compilando el código anterior, con el número de procesadores variando de 1 a 16, obtenemos el siguiente gráfico:

In[ ]:
import matplotlib.pyplot as plt

times = []
for n_proc in range(1, 17):
    start = time()
    results = test_primes(n_proc)
    end = time()
    times.append(end - start)

plt.plot(range(1, 17), times)
plt.xlabel('Número de procesadores')
plt.ylabel('Tiempo (s)')
plt.show()
Out[]:

Como vemos, el tiempo decrece de manera inversamente proporcional al número de procesadores.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *