客户端:

#! python3

import asyncio
import time


async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8885)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

async def tcp_echo_client1(message):

    try:
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8885)

        async def read():
            while True:
                data = await reader.readline()
                if not data:
                    break
                
                # message = data.decode().strip()
                print(f'Received: {data.decode()!r}')


        async def write():
            print(f'Send: {message!r}')
            writer.write(message.encode())
            await writer.drain()


        async def ping():
            await asyncio.sleep(5)
            while True:
                print("ping")
                writer.write("ping\n".encode())
                await writer.drain()
                await asyncio.sleep(10)

        await asyncio.gather(read(), write(), ping())

    except ConnectionResetError as e:
        print("断开连接")
        print(e)
    except Exception as e:
        print("异常断开连接")
        print(e)

    # data = await reader.read(100)
    # print(f'Received: {data.decode()!r}')

    # print('Close the connection')
    # writer.close()
    # await writer.wait_closed()


if __name__ == '__main__':
    asyncio.run(tcp_echo_client1("Hello World!\n"))

服务端:

#! python3

import asyncio
import time

async def handle_echo(reader, writer):
    try:
        addr = writer.get_extra_info('peername')
        while True:
            # 5秒接收不到数据就超时
            data = await asyncio.wait_for(reader.readline(), timeout=30.0) 
            if not data:
                print(f"断开连接")
                break
            message = data.decode()

            print(f"Received {message!r} from {addr!r}")
            # print(f"Send: {message!r}")

            if message.strip() == "ping":
                writer.write("pong\n".encode())
            else:
                fromServer = f"from server: {message.strip()}\n"
                writer.write(fromServer.encode())

            await writer.drain()
    except TimeoutError:
        print("超时断连")
        writer.write("超时断开连接\n".encode())
        await writer.drain()
    finally:
        print("Close the connection")
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '0.0.0.0', 8885)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main())









#! python3

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

    # return f"task done: {what}"


async def main():
    print(f"started at {time.strftime('%X')}")


    # await say_after(1, 'hello')
    # await say_after(2, 'world')
    # print(f"finished at {time.strftime('%X')}")

    # 总执行时间≈2秒(由最慢的任务决定)
    # task1 = asyncio.create_task(say_after(1, 'hello'))
    # task2 = asyncio.create_task(say_after(2, 'world'))
    # await task1
    # await task2
    # print(f"finished at {time.strftime('%X')}") 

    # task管理器 当存在上下文管理器时 await 是隐式执行的。
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(say_after(1, 'hello'))
        task2 = tg.create_task(say_after(2, 'world'))

    print(f"finished at {time.strftime('%X')}")


async def taskCallback():

    task = asyncio.create_task(say_after(1, 'hello'))
    task.add_done_callback(lambda t: print("Task is done! Result: {!r}".format(t.result())))
    await task




class TaskCancelException(Exception):
    """Exception raised to terminate a task group."""

async def force_cancel_task():
    raise TaskCancelException()

async def taskGroupCancel():

    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(say_after(1, 'hello'))
            task2 = tg.create_task(say_after(2, 'world'))

            print(f"started at {time.strftime('%X')}")
            await asyncio.sleep(1)

            tg.create_task(force_cancel_task()) # 异步任务组取消
    except* TaskCancelException:
        print("Task is canceled!")

async def taskCancel():
    print(f"started main at {time.strftime('%X')}")
    task = asyncio.create_task(say_after(5, 'hello'))
    print(f"started at {time.strftime('%X')}")
    await asyncio.sleep(1)
    task.cancel()
    # task.cancel() 并不会立即终止任务,而是会在下一个 await 或挂起点 抛出 asyncio.CancelledError 异常
    try:
        await task
    except asyncio.CancelledError:
        print("main task is canceled!")

    print(f"finished at {time.strftime('%X')}")

async def taskTimeout():
    try:
        print(f"started at {time.strftime('%X')}")
        async with asyncio.timeout(5):
            await asyncio.sleep(10)
    except TimeoutError:
        print("Task is timeout!")
        print(f"finished at {time.strftime('%X')}")

async def taskTimeoutAt():
    loop = asyncio.get_running_loop()
    deadline = loop.time() + 5
    print(f"started at {time.strftime('%X')}")
    print(deadline)
    try:
        async with asyncio.timeout_at(deadline):
            await asyncio.sleep(10)
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")
        print(f"finished at {time.strftime('%X')}")


async def eternity():
    # 休眠一小时
    await asyncio.sleep(10)
    print('yay!')

async def taskWaitFor():
    # 等待至多 5 秒
    try:
        print(f"started at {time.strftime('%X')}")
        await asyncio.wait_for(eternity(), timeout=5.0)
    except TimeoutError:
        print('timeout!')
        print(f"finished at {time.strftime('%X')}")



def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # 请注意 time.sleep() 可被替换为任意一种
    # 阻塞式 IO 密集型操作,例如文件操作。
    time.sleep(5)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def taskExcute1():
    # print(f"started main at {time.strftime('%X')}")
    await asyncio.sleep(2)
    print(f"finished task1 at {time.strftime('%X')}")

async def taskExcuteInThread():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        taskExcute1())

    print(f"finished main at {time.strftime('%X')}")



async def task1(i):
    await asyncio.sleep(i)
    print(f"task {i} is done")
    return f'the result {i}'

async def taskWaitAllComplete():
    print(f"started main at {time.strftime('%X')}")

    # 多个并发任务,完成时间取决于最慢的任务
    tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]

    # done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
    done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.ALL_COMPLETED) # 不设置超时时间
    
    for d in done:
        print(d.result())

    for p in pending:
        print("超时未完成task")
        print(p)

    print(f"finished main at {time.strftime('%X')}")

async def taskWaitOneComplete():
    print(f"started main at {time.strftime('%X')}")

    # 多个并发任务,完成时间取决于最慢的任务
    tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]

    # done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
    done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.FIRST_COMPLETED) # 不设置超时时间
    
    for d in done:
        print(d.result())

    for p in pending:
        print("超时未完成task")
        print(p)
        p.cancel()

    print(f"finished main at {time.strftime('%X')}")




async def taskLock(name):
    # 创建一个锁
    async with lock:
        print(f"started task_{name} at {time.strftime('%X')}")
        print(f"lock acquired at {time.strftime('%X')}")
        await asyncio.sleep(1)
        print(f"lock released at {time.strftime('%X')}")


async def runTaskLock():

    async with asyncio.TaskGroup() as tg:
        tg.create_task(taskLock("A"))
        tg.create_task(taskLock("B"))
        tg.create_task(taskLock("C"))

    print("All tasks are done")



async def taskEvent(event):
    print(f"waiting event at {time.strftime('%X')}")
    await event.wait()
    print(f"done event at {time.strftime('%X')}")

async def taskEventSet(event):
    await asyncio.sleep(5)
    event.set()

async def runTaskEvent():
    event = asyncio.Event()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(taskEvent(event))
        tg.create_task(taskEvent(event))
        tg.create_task(taskEvent(event))
        tg.create_task(taskEventSet(event))


async def taskConditionProducer(condition):
    async with condition:
        await asyncio.sleep(2)
        print(f"condition nofify at {time.strftime('%X')}")
        # condition.notify()
        condition.notify_all()
        # for i in range(10):
            # await asyncio.sleep(1)
            # condition.notify()

async def taskConditionConsumer(condition, name):
    async with condition:
        while True:
            print(f"{name} waiting condition at {time.strftime('%X')}")
            await condition.wait()
            print(f"{name} done condition at {time.strftime('%X')}")
            break
        # print(f"waiting condition at {time.strftime('%X')}")
        # await condition.wait()
        # print(f"done condition at {time.strftime('%X')}")

async def runTaskCondition():
    condition = asyncio.Condition()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(taskConditionConsumer(condition, "A"))
        tg.create_task(taskConditionConsumer(condition, "B"))
        tg.create_task(taskConditionConsumer(condition, "C"))
        tg.create_task(taskConditionConsumer(condition, "D"))
        tg.create_task(taskConditionConsumer(condition, "E"))
        tg.create_task(taskConditionConsumer(condition, "F"))
        tg.create_task(taskConditionConsumer(condition, "G"))
        tg.create_task(taskConditionProducer(condition))
        

async def taskSemaphore(semaphore, i):
    async with semaphore:
        print(f"started task_{i} at {time.strftime('%X')}")
        print(f"semaphore acquired at {time.strftime('%X')}")
        await asyncio.sleep(1)
        print(f"semaphore released at {time.strftime('%X')}")

async def runTaskSemaphore():
    # 信号量, 限制最多3个并发任务
    semaphore = asyncio.Semaphore(3)
    async with asyncio.TaskGroup() as tg:
        for i in range(10):
            tg.create_task(taskSemaphore(semaphore, i))



async def taskBarrier(barrier, i):
    print(f"started task_{i} at {time.strftime('%X')}")
    await asyncio.sleep(1 + i * 0.5)
    await barrier.wait()
    print(f"done task_{i} at {time.strftime('%X')}")


async def runTaskBarrier():

    # 任务屏障, 限制最多3个并发任务
    barrier = asyncio.Barrier(3)
    async with asyncio.TaskGroup() as tg:
        for i in range(3):
            tg.create_task(taskBarrier(barrier, i))


async def runTaskBarrier1():

    # 任务屏障, 限制最多3个并发任务
    barrier = asyncio.Barrier(3)

    asyncio.create_task(taskBarrier(barrier, 0))
    asyncio.create_task(taskBarrier(barrier, 1))

    await asyncio.sleep(2)

    await barrier.wait()
    print("All tasks are done")



async def taskQueueProducer(queue):
    for i in range(20):
        await queue.put(i)
        print(f"put {i} to queue")
        await asyncio.sleep(1)

    await queue.join()
    print("All items have been processed")

    raise TaskDoneException()


async def taskQueueConsumer(queue):
    while True:
        item = await queue.get()
        await asyncio.sleep(1)
        queue.task_done()
        print(f"get {item} from queue")

class TaskDoneException(Exception):
    """Exception raised to terminate a task group."""

async def runTaskQueue():
    queue = asyncio.Queue()
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(taskQueueProducer(queue))
            tg.create_task(taskQueueConsumer(queue))
            tg.create_task(taskQueueConsumer(queue))
    except* TaskDoneException:
        print("Tasks is done!")



if __name__ == '__main__':
    # asyncio.run(main())

    # 任务回调
    # asyncio.run(taskCallback())

    # 任务组取消 (模拟抛异常终结)
    # asyncio.run(taskGroupCancel())

    # 任务取消 (模拟正常终结)
    # asyncio.run(taskCancel())

    # 任务超时异常
    # asyncio.run(taskTimeout())

    # 任务超时异常
    # asyncio.run(taskTimeoutAt())

    # 任务超时异常
    # asyncio.run(taskWaitFor())

    # 针对于阻塞IO密集型操作,在新线程调用运行
    # asyncio.run(taskExcuteInThread())


    # 任务简单等待 (不会抛超时异常, 分别返回完成、未完成任务future)
    # asyncio.run(taskWaitAllComplete())

    # 任务简单等待 (会抛超时异常, 分别返回完成、未完成任务future)
    # asyncio.run(taskWaitOneComplete())


    # lock = asyncio.Lock()
    # # 任务锁
    # asyncio.run(runTaskLock())

    # 任务事件event 协程之间的信号传递, 事件可被用来通知多个 asyncio 任务已经有事件发生。
    # asyncio.run(runTaskEvent())


    # 任务条件condition
    # asyncio.run(runTaskCondition())

    # 信号量, 限制最多3个并发任务
    # asyncio.run(runTaskSemaphore())

    # 任务屏障, 限制最多3个并发任务
    # asyncio.run(runTaskBarrier())
    # asyncio.run(runTaskBarrier1())

    # 任务队列 (生产者-消费者模式) 任务完成后自动退出
    asyncio.run(runTaskQueue())














1、NGINX + websocket负载均衡

http {
    upstream websocket_cluster {
        # Workerman 节点地址,可多台机器或多个端口
        server 192.168.1.10:7272;
        server 192.168.1.11:7272;
        server 192.168.1.12:7272;
    }

    server {
        listen 80;
        server_name yourdomain.com;

        location /ws/ {
            proxy_pass http://websocket_cluster;

            # WebSocket 必要头部
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            # 防止超时
            proxy_read_timeout 60s;
            proxy_send_timeout 60s;
        }
    }
}

运行task进程:

use \Workerman\Worker;
use \Workerman\Connection\TcpConnection;

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 10;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{

    echo "接收到任务: " . $task_data . PHP_EOL;

    // 假设发来的是json数据
    $task_data = json_decode($task_data, true);
     // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
    //  $task_result = ......

    sleep(3);
    $task_result = array(
        'task_id' => $task_data['task_id'],
        'result' => 'task result',
    );

    // 发送结果
    $connection->send(json_encode($task_result));
};

Worker::runAll();

- 阅读剩余部分 -

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import os
import random
import threading

def task(name):

print(f"Hello from task {name} started (PID: {os.getpid()})")
time.sleep(2)
print(f"Task {name} finished")
return f"Result of {name}"

def task1(name):

print(f"Hello from task {name} started (PID: {os.getpid()})")
time.sleep(random.randint(1, 4))
print(f"Task {name} finished")
return f"Result of {name}"

def task2(name):

print(f"Hello from task {name} started (PID: {os.getpid()})")
while True:
    if exit_event.is_set():
        print("任务中断...")
        break
    time.sleep(random.randint(1, 4))
    print(f"Task {name} running...")

if name == '__main__':

print(f"主进程 (PID: {os.getpid()})")

# with ThreadPoolExecutor(max_workers=3) as executor:
#     future1 = executor.submit(task, "A")
#     future2 = executor.submit(task, "B")
#     future3 = executor.submit(task, "C")

#     print(future1.result())
#     print(future2.result())
#     print(future3.result())
    

# 多线程实现
# with ThreadPoolExecutor(max_workers=3) as executor:

#     futures = [executor.submit(task1, f"task_{i}") for i in range(5)]
#     # 按照完成顺序获取结果
#     for future in as_completed(futures):
#         print(future.result())

# 多进程实现 (调用Multiprocessing模块)
with ProcessPoolExecutor(max_workers=3) as executor:

    futures = [executor.submit(task1, f"task_{i}") for i in range(5)]
    # 按照完成顺序获取结果
    for future in as_completed(futures):
        print(future.result())