忘忧君 发布的文章

原理图:
deepseek_mermaid_20250825_b2d79a.png

流程图:
deepseek_mermaid_20250825_c8512c.png

其详细步骤分解如下:
1、用户点击登录按钮:
用户在一个第三方应用(客户端)上点击“使用微信登录”。

2、重定向到授权服务器:
客户端将用户重定向到微信的授权服务器(Authorization Server),并携带以下关键信息:
response_type=code(表明使用授权码模式)
client_id(第三方应用的标识ID,提前在微信开放平台注册的)
redirect_uri(授权成功后用户被重定向回客户端的地址)
scope(申请的权限范围,如 read_contacts)
state(一个随机生成的字符串,用于防止CSRF攻击)

3、用户认证与授权:
用户在微信的页面上输入用户名和密码进行登录(确保密码不会泄露给第三方应用)。
登录成功后,微信会向用户展示一个授权页面,列出第三方应用申请的权限(如“获取你的好友列表”)。
用户选择是否同意授权。

4、颁发授权码(Authorization Code):
如果用户同意授权,授权服务器会将用户重定向回第一步提供的 redirect_uri,并在URL中附带一个授权码(Code) 和之前传来的 state。
https://client.example.com/callback?code=AUTH_CODE_HERE&state=xyz

5、换取访问令牌(Access Token):
关键步骤:第三方应用的后端服务器(而不是浏览器前端)收到授权码后,会向授权服务器的令牌端点(Token Endpoint)发起一个后台的、服务器到服务器的请求。这个请求包含:
grant_type=authorization_code(声明授权类型)
code(上一步收到的授权码)
redirect_uri(必须与第一步的完全一致)
client_id 和 client_secret(应用密钥,用于证明自己的身份,必须保密!)
这个请求是后端发起的,避免了 client_secret 暴露给前端。

6、颁发访问令牌:
授权服务器验证所有信息:client_id/client_secret 是否匹配、授权码是否有效、重定向URI是否正确。
验证通过后,授权服务器会返回一个 JSON 数据,里面包含:
access_token: 期盼已久的访问令牌。
refresh_token: (可选)刷新令牌。
expires_in: 访问令牌的过期时间(例如 7200 秒)。
token_type: 令牌类型,通常是 Bearer。

7、访问受保护资源:
现在,第三方应用的后端或前端(取决于场景)就可以使用这个 access_token 去向微信的资源服务器请求数据了。
通常在 API 请求的 Authorization 头中加入:Authorization: Bearer <access_token>。
资源服务器会验证这个令牌的有效性(签名、有效期、范围),然后返回请求的数据(如好友列表)。

8、刷新访问令牌(可选):
当 access_token 过期后,客户端可以使用 refresh_token 再次向授权服务器请求一个新的 access_token,而无需用户再次手动授权。

客户端:

#! python3

import asyncio
import time


async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8885)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

async def tcp_echo_client1(message):

    try:
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8885)

        async def read():
            while True:
                data = await reader.readline()
                if not data:
                    break
                
                # message = data.decode().strip()
                print(f'Received: {data.decode()!r}')


        async def write():
            print(f'Send: {message!r}')
            writer.write(message.encode())
            await writer.drain()


        async def ping():
            await asyncio.sleep(5)
            while True:
                print("ping")
                writer.write("ping\n".encode())
                await writer.drain()
                await asyncio.sleep(10)

        await asyncio.gather(read(), write(), ping())

    except ConnectionResetError as e:
        print("断开连接")
        print(e)
    except Exception as e:
        print("异常断开连接")
        print(e)

    # data = await reader.read(100)
    # print(f'Received: {data.decode()!r}')

    # print('Close the connection')
    # writer.close()
    # await writer.wait_closed()


if __name__ == '__main__':
    asyncio.run(tcp_echo_client1("Hello World!\n"))

服务端:

#! python3

import asyncio
import time

async def handle_echo(reader, writer):
    try:
        addr = writer.get_extra_info('peername')
        while True:
            # 5秒接收不到数据就超时
            data = await asyncio.wait_for(reader.readline(), timeout=30.0) 
            if not data:
                print(f"断开连接")
                break
            message = data.decode()

            print(f"Received {message!r} from {addr!r}")
            # print(f"Send: {message!r}")

            if message.strip() == "ping":
                writer.write("pong\n".encode())
            else:
                fromServer = f"from server: {message.strip()}\n"
                writer.write(fromServer.encode())

            await writer.drain()
    except TimeoutError:
        print("超时断连")
        writer.write("超时断开连接\n".encode())
        await writer.drain()
    finally:
        print("Close the connection")
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '0.0.0.0', 8885)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main())









#! python3

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

    # return f"task done: {what}"


async def main():
    print(f"started at {time.strftime('%X')}")


    # await say_after(1, 'hello')
    # await say_after(2, 'world')
    # print(f"finished at {time.strftime('%X')}")

    # 总执行时间≈2秒(由最慢的任务决定)
    # task1 = asyncio.create_task(say_after(1, 'hello'))
    # task2 = asyncio.create_task(say_after(2, 'world'))
    # await task1
    # await task2
    # print(f"finished at {time.strftime('%X')}") 

    # task管理器 当存在上下文管理器时 await 是隐式执行的。
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(say_after(1, 'hello'))
        task2 = tg.create_task(say_after(2, 'world'))

    print(f"finished at {time.strftime('%X')}")


async def taskCallback():

    task = asyncio.create_task(say_after(1, 'hello'))
    task.add_done_callback(lambda t: print("Task is done! Result: {!r}".format(t.result())))
    await task




class TaskCancelException(Exception):
    """Exception raised to terminate a task group."""

async def force_cancel_task():
    raise TaskCancelException()

async def taskGroupCancel():

    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(say_after(1, 'hello'))
            task2 = tg.create_task(say_after(2, 'world'))

            print(f"started at {time.strftime('%X')}")
            await asyncio.sleep(1)

            tg.create_task(force_cancel_task()) # 异步任务组取消
    except* TaskCancelException:
        print("Task is canceled!")

async def taskCancel():
    print(f"started main at {time.strftime('%X')}")
    task = asyncio.create_task(say_after(5, 'hello'))
    print(f"started at {time.strftime('%X')}")
    await asyncio.sleep(1)
    task.cancel()
    # task.cancel() 并不会立即终止任务,而是会在下一个 await 或挂起点 抛出 asyncio.CancelledError 异常
    try:
        await task
    except asyncio.CancelledError:
        print("main task is canceled!")

    print(f"finished at {time.strftime('%X')}")

async def taskTimeout():
    try:
        print(f"started at {time.strftime('%X')}")
        async with asyncio.timeout(5):
            await asyncio.sleep(10)
    except TimeoutError:
        print("Task is timeout!")
        print(f"finished at {time.strftime('%X')}")

async def taskTimeoutAt():
    loop = asyncio.get_running_loop()
    deadline = loop.time() + 5
    print(f"started at {time.strftime('%X')}")
    print(deadline)
    try:
        async with asyncio.timeout_at(deadline):
            await asyncio.sleep(10)
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")
        print(f"finished at {time.strftime('%X')}")


async def eternity():
    # 休眠一小时
    await asyncio.sleep(10)
    print('yay!')

async def taskWaitFor():
    # 等待至多 5 秒
    try:
        print(f"started at {time.strftime('%X')}")
        await asyncio.wait_for(eternity(), timeout=5.0)
    except TimeoutError:
        print('timeout!')
        print(f"finished at {time.strftime('%X')}")



def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # 请注意 time.sleep() 可被替换为任意一种
    # 阻塞式 IO 密集型操作,例如文件操作。
    time.sleep(5)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def taskExcute1():
    # print(f"started main at {time.strftime('%X')}")
    await asyncio.sleep(2)
    print(f"finished task1 at {time.strftime('%X')}")

async def taskExcuteInThread():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        taskExcute1())

    print(f"finished main at {time.strftime('%X')}")



async def task1(i):
    await asyncio.sleep(i)
    print(f"task {i} is done")
    return f'the result {i}'

async def taskWaitAllComplete():
    print(f"started main at {time.strftime('%X')}")

    # 多个并发任务,完成时间取决于最慢的任务
    tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]

    # done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
    done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.ALL_COMPLETED) # 不设置超时时间
    
    for d in done:
        print(d.result())

    for p in pending:
        print("超时未完成task")
        print(p)

    print(f"finished main at {time.strftime('%X')}")

async def taskWaitOneComplete():
    print(f"started main at {time.strftime('%X')}")

    # 多个并发任务,完成时间取决于最慢的任务
    tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]

    # done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
    done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.FIRST_COMPLETED) # 不设置超时时间
    
    for d in done:
        print(d.result())

    for p in pending:
        print("超时未完成task")
        print(p)
        p.cancel()

    print(f"finished main at {time.strftime('%X')}")




async def taskLock(name):
    # 创建一个锁
    async with lock:
        print(f"started task_{name} at {time.strftime('%X')}")
        print(f"lock acquired at {time.strftime('%X')}")
        await asyncio.sleep(1)
        print(f"lock released at {time.strftime('%X')}")


async def runTaskLock():

    async with asyncio.TaskGroup() as tg:
        tg.create_task(taskLock("A"))
        tg.create_task(taskLock("B"))
        tg.create_task(taskLock("C"))

    print("All tasks are done")



async def taskEvent(event):
    print(f"waiting event at {time.strftime('%X')}")
    await event.wait()
    print(f"done event at {time.strftime('%X')}")

async def taskEventSet(event):
    await asyncio.sleep(5)
    event.set()

async def runTaskEvent():
    event = asyncio.Event()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(taskEvent(event))
        tg.create_task(taskEvent(event))
        tg.create_task(taskEvent(event))
        tg.create_task(taskEventSet(event))


async def taskConditionProducer(condition):
    async with condition:
        await asyncio.sleep(2)
        print(f"condition nofify at {time.strftime('%X')}")
        # condition.notify()
        condition.notify_all()
        # for i in range(10):
            # await asyncio.sleep(1)
            # condition.notify()

async def taskConditionConsumer(condition, name):
    async with condition:
        while True:
            print(f"{name} waiting condition at {time.strftime('%X')}")
            await condition.wait()
            print(f"{name} done condition at {time.strftime('%X')}")
            break
        # print(f"waiting condition at {time.strftime('%X')}")
        # await condition.wait()
        # print(f"done condition at {time.strftime('%X')}")

async def runTaskCondition():
    condition = asyncio.Condition()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(taskConditionConsumer(condition, "A"))
        tg.create_task(taskConditionConsumer(condition, "B"))
        tg.create_task(taskConditionConsumer(condition, "C"))
        tg.create_task(taskConditionConsumer(condition, "D"))
        tg.create_task(taskConditionConsumer(condition, "E"))
        tg.create_task(taskConditionConsumer(condition, "F"))
        tg.create_task(taskConditionConsumer(condition, "G"))
        tg.create_task(taskConditionProducer(condition))
        

async def taskSemaphore(semaphore, i):
    async with semaphore:
        print(f"started task_{i} at {time.strftime('%X')}")
        print(f"semaphore acquired at {time.strftime('%X')}")
        await asyncio.sleep(1)
        print(f"semaphore released at {time.strftime('%X')}")

async def runTaskSemaphore():
    # 信号量, 限制最多3个并发任务
    semaphore = asyncio.Semaphore(3)
    async with asyncio.TaskGroup() as tg:
        for i in range(10):
            tg.create_task(taskSemaphore(semaphore, i))



async def taskBarrier(barrier, i):
    print(f"started task_{i} at {time.strftime('%X')}")
    await asyncio.sleep(1 + i * 0.5)
    await barrier.wait()
    print(f"done task_{i} at {time.strftime('%X')}")


async def runTaskBarrier():

    # 任务屏障, 限制最多3个并发任务
    barrier = asyncio.Barrier(3)
    async with asyncio.TaskGroup() as tg:
        for i in range(3):
            tg.create_task(taskBarrier(barrier, i))


async def runTaskBarrier1():

    # 任务屏障, 限制最多3个并发任务
    barrier = asyncio.Barrier(3)

    asyncio.create_task(taskBarrier(barrier, 0))
    asyncio.create_task(taskBarrier(barrier, 1))

    await asyncio.sleep(2)

    await barrier.wait()
    print("All tasks are done")



async def taskQueueProducer(queue):
    for i in range(20):
        await queue.put(i)
        print(f"put {i} to queue")
        await asyncio.sleep(1)

    await queue.join()
    print("All items have been processed")

    raise TaskDoneException()


async def taskQueueConsumer(queue):
    while True:
        item = await queue.get()
        await asyncio.sleep(1)
        queue.task_done()
        print(f"get {item} from queue")

class TaskDoneException(Exception):
    """Exception raised to terminate a task group."""

async def runTaskQueue():
    queue = asyncio.Queue()
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(taskQueueProducer(queue))
            tg.create_task(taskQueueConsumer(queue))
            tg.create_task(taskQueueConsumer(queue))
    except* TaskDoneException:
        print("Tasks is done!")



if __name__ == '__main__':
    # asyncio.run(main())

    # 任务回调
    # asyncio.run(taskCallback())

    # 任务组取消 (模拟抛异常终结)
    # asyncio.run(taskGroupCancel())

    # 任务取消 (模拟正常终结)
    # asyncio.run(taskCancel())

    # 任务超时异常
    # asyncio.run(taskTimeout())

    # 任务超时异常
    # asyncio.run(taskTimeoutAt())

    # 任务超时异常
    # asyncio.run(taskWaitFor())

    # 针对于阻塞IO密集型操作,在新线程调用运行
    # asyncio.run(taskExcuteInThread())


    # 任务简单等待 (不会抛超时异常, 分别返回完成、未完成任务future)
    # asyncio.run(taskWaitAllComplete())

    # 任务简单等待 (会抛超时异常, 分别返回完成、未完成任务future)
    # asyncio.run(taskWaitOneComplete())


    # lock = asyncio.Lock()
    # # 任务锁
    # asyncio.run(runTaskLock())

    # 任务事件event 协程之间的信号传递, 事件可被用来通知多个 asyncio 任务已经有事件发生。
    # asyncio.run(runTaskEvent())


    # 任务条件condition
    # asyncio.run(runTaskCondition())

    # 信号量, 限制最多3个并发任务
    # asyncio.run(runTaskSemaphore())

    # 任务屏障, 限制最多3个并发任务
    # asyncio.run(runTaskBarrier())
    # asyncio.run(runTaskBarrier1())

    # 任务队列 (生产者-消费者模式) 任务完成后自动退出
    asyncio.run(runTaskQueue())














1、NGINX + websocket负载均衡

http {
    upstream websocket_cluster {
        # Workerman 节点地址,可多台机器或多个端口
        server 192.168.1.10:7272;
        server 192.168.1.11:7272;
        server 192.168.1.12:7272;
    }

    server {
        listen 80;
        server_name yourdomain.com;

        location /ws/ {
            proxy_pass http://websocket_cluster;

            # WebSocket 必要头部
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            # 防止超时
            proxy_read_timeout 60s;
            proxy_send_timeout 60s;
        }
    }
}

运行task进程:

use \Workerman\Worker;
use \Workerman\Connection\TcpConnection;

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 10;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{

    echo "接收到任务: " . $task_data . PHP_EOL;

    // 假设发来的是json数据
    $task_data = json_decode($task_data, true);
     // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
    //  $task_result = ......

    sleep(3);
    $task_result = array(
        'task_id' => $task_data['task_id'],
        'result' => 'task result',
    );

    // 发送结果
    $connection->send(json_encode($task_result));
};

Worker::runAll();

- 阅读剩余部分 -