Threading-Beispiel

Aktualisieren und Ausgeben eines Counters:

[1]:
counter = 0

print('Starting up')
for i in range(10):
    counter += 1
    print('The count is %d' % counter)
print('Finishing up')
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Beginnt mit Code, der klar und einfach ist und von oben nach unten ausgeführt wird. Er ist einfach zu entwickeln und inkrementell zu testen.

Hinweis: Testet und debuggt eure Anwendung bevor ihr mit Threading beginnt. Threading macht das Debugggen niemals einfacher.

Umwandeln in Funktionen

Im nächsten Schritt wird dann wiederverwendbarer Code als Funktion erstellt:

[2]:
counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter

    counter += 1
    print('The count is %d' % counter)

print('Starting up')
for i in range(10):
    worker()
print('Finishing up')
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Multi-Threading

Jetzt können einige Worker-Threads gestartet werden:

[3]:
import threading

counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter

    counter += 1
    print('The count is %d' % counter)

print('Starting up')
for i in range(10):
    threading.Thread(target=worker).start()
print('Finishing up')
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Test

Ein einfacher Testlauf gleicht perfekt der ursprünglichen Ausgabe.

Erkennen von Race Conditions

Hinweis: Tests können nicht die Abwesenheit von Fehlern beweisen. Viele interessante Race Conditions zeigen sich nicht in Testumgebungen.

Fuzzing

Fuzzing ist eine Technik um das Erkennen von Race Conditions zu verbessern:

[4]:
import threading, time, random

FUZZ = True

def fuzz():
    if FUZZ:
        time.sleep(random.random())

counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter

    fuzz()
    oldcnt = counter
    fuzz()
    counter = oldcnt + 1
    fuzz()
    print('The count is %d' % counter, end='')
    fuzz()

print('Starting up')
fuzz()
for i in range(10):
    threading.Thread(target=worker).start()
    fuzz()
print('Finishing up')
fuzz()
Starting up
The count is 1The count is 2The count is 2The count is 3The count is 4The count is 4The count is 4The count is 4Finishing up

Diese Technik ist auf relativ kleine Blöcke von Code beschränkt und ist insofern unvollkommen, als sie weiterhin nicht die Abwesenheit von Fehlern beweisen kann. Dennoch können Fuzzed Tests ggf. Race Conditions aufdecken.

Sorgfältiges Threading mit Queues

Dabei sind folgende Regeln zu beachten:

  1. Alle gemeinsamen Ressourcen sollten in genau einem Thread ausgeführt werden. Alle Kommunikation mit diesem Thread sollte mit nur einer atomaren Message Queue erfolgen – in der Regel mit dem Queue-Modul, E-Mail oder Message Queues wie RabbitMQ oder ZeroMQ.

Ressourcen, die diese Technik benötigen sind z.B. globale Variablen, Benutzereingaben, Ausgabegeräte, Dateien, Sockets usw.

  1. Eine Kategorie von Sequenzierungsproblemen besteht im Sicherzustellen, dass Schritt A vor Schritt B ausgeführt wird. Die Lösung besteht darin, beide im selben Thread auszuführen, in dem alle Aktionen nacheinander ablaufen.

  2. Um eine Barriere zu implementieren, die darauf wartet, dass alle parallelen Threads abgeschlossen sind, müssen nur alle Threads mit join() verbunden werden.

  3. Ihr könnt nicht darauf warten, dass Daemon-Threads abgeschlossen werden (sie sind Endlosschleifen); stattdessen solltet ihr join() auf der Queue selbst ausführen, so dass die Aufgaben erst zusammengefügt werden wenn alle Aufgaben in der Queue erledigt sind.

  4. Ihr könnt globale Variablen verwenden um zwischen Funktionen zu kommunizieren, allerdings nur innerhalb eines single-threaded Programm. In einem multi-thread-Programm könnt ihr globale Variablen jedoch nicht verwenden da sie mutable sind. Dann ist die bessere Lösung threading.local(), da sie in einem Thread zwar global ist, jedoch nicht darüber hinaus.

  5. Versucht niemals, einen Thread von außen zu beenden: ihr wisst nie, ob dieser Thread ein Lock einthält. Daher bietet Python auch keinen direkten Mechanismus zum beenden von Threads. Falls ihr dies jedoch mit ctypes versucht, ist dies ein Rezept für Deadlocks.

Wenn wir nun diese Regeln anwenden, sieht unser Code so aus:

[5]:
import threading, queue

counter = 0

counter_queue = queue.Queue()

def counter_manager():
    'I have EXCLUSIVE rights to update the counter variable'
    global counter

    while True:
        increment = counter_queue.get()
        counter += increment
        print_queue.put([
            'The count is %d' % counter,
            ])
        counter_queue.task_done()

t = threading.Thread(target=counter_manager)
t.daemon = True
t.start()
del t

print_queue = queue.Queue()

def print_manager():
    while True:
        job = print_queue.get()
        for line in job:
            print(line)
        print_queue.task_done()

t = threading.Thread(target=print_manager)
t.daemon = True
t.start()
del t

def worker():
    'My job is to increment the counter and print the current count'
    counter_queue.put(1)

print_queue.put(['Starting up'])
worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
for t in worker_threads:
    t.join()

counter_queue.join()
print_queue.put(['Finishing up'])
print_queue.join()
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8The count is 8
The count is 9
The count is 10
Finishing up

Sorgfältiges Threading mit Locks

Wenn wir Threading mit Locks statt mit Queues machen, wirkt der Code noch aufgeräumter:

[6]:
import threading, time, random

counter_lock = threading.Lock()
printer_lock = threading.Lock()

counter = 0

def worker():
    global counter
    with counter_lock:
        counter += 1
        with printer_lock:
            print('The count is %d' % counter)

with printer_lock:
    print('Starting up')

worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
for t in worker_threads:
    t.join()

with printer_lock:
    print('Finishing up')
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Schließlich noch einige Hinweise zu Locks:

  1. Locks sind nur sog. Flags, sie verhindern nicht wirklich zuverlässig.

  2. Im Allgemeinen sollten Locks als primitives Hilfsmittel betrachtet werden, das in nicht-trivialen Beispielen schwierig zu verstehen ist. Bei komplexeren Anwendungen sollten also besser atomare Message Queues verwendet werden.

  3. Je mehr Locks gleichzeitig gesetzt sind, desto geringer werden die Vorteile gleichzeitiger Verarbeitung.