山之高,月出小。
月之小,何皎皎。
我有所思之人在远道,一日不见兮我心悄悄。

当你渴望未曾拥有的,不要忘了珍惜你现在拥有的,曾几何时,你现在拥有的,也是当初你梦寐以求的。

原理图:
deepseek_mermaid_20250825_b2d79a.png

流程图:
deepseek_mermaid_20250825_c8512c.png

其详细步骤分解如下:
1、用户点击登录按钮:
用户在一个第三方应用(客户端)上点击“使用微信登录”。

2、重定向到授权服务器:
客户端将用户重定向到微信的授权服务器(Authorization Server),并携带以下关键信息:
response_type=code(表明使用授权码模式)
client_id(第三方应用的标识ID,提前在微信开放平台注册的)
redirect_uri(授权成功后用户被重定向回客户端的地址)
scope(申请的权限范围,如 read_contacts)
state(一个随机生成的字符串,用于防止CSRF攻击)

3、用户认证与授权:
用户在微信的页面上输入用户名和密码进行登录(确保密码不会泄露给第三方应用)。
登录成功后,微信会向用户展示一个授权页面,列出第三方应用申请的权限(如“获取你的好友列表”)。
用户选择是否同意授权。

4、颁发授权码(Authorization Code):
如果用户同意授权,授权服务器会将用户重定向回第一步提供的 redirect_uri,并在URL中附带一个授权码(Code) 和之前传来的 state。
https://client.example.com/callback?code=AUTH_CODE_HERE&state=xyz

5、换取访问令牌(Access Token):
关键步骤:第三方应用的后端服务器(而不是浏览器前端)收到授权码后,会向授权服务器的令牌端点(Token Endpoint)发起一个后台的、服务器到服务器的请求。这个请求包含:
grant_type=authorization_code(声明授权类型)
code(上一步收到的授权码)
redirect_uri(必须与第一步的完全一致)
client_id 和 client_secret(应用密钥,用于证明自己的身份,必须保密!)
这个请求是后端发起的,避免了 client_secret 暴露给前端。

6、颁发访问令牌:
授权服务器验证所有信息:client_id/client_secret 是否匹配、授权码是否有效、重定向URI是否正确。
验证通过后,授权服务器会返回一个 JSON 数据,里面包含:
access_token: 期盼已久的访问令牌。
refresh_token: (可选)刷新令牌。
expires_in: 访问令牌的过期时间(例如 7200 秒)。
token_type: 令牌类型,通常是 Bearer。

7、访问受保护资源:
现在,第三方应用的后端或前端(取决于场景)就可以使用这个 access_token 去向微信的资源服务器请求数据了。
通常在 API 请求的 Authorization 头中加入:Authorization: Bearer <access_token>。
资源服务器会验证这个令牌的有效性(签名、有效期、范围),然后返回请求的数据(如好友列表)。

8、刷新访问令牌(可选):
当 access_token 过期后,客户端可以使用 refresh_token 再次向授权服务器请求一个新的 access_token,而无需用户再次手动授权。

客户端:

#! python3

import asyncio
import time


async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8885)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

async def tcp_echo_client1(message):

    try:
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8885)

        async def read():
            while True:
                data = await reader.readline()
                if not data:
                    break
                
                # message = data.decode().strip()
                print(f'Received: {data.decode()!r}')


        async def write():
            print(f'Send: {message!r}')
            writer.write(message.encode())
            await writer.drain()


        async def ping():
            await asyncio.sleep(5)
            while True:
                print("ping")
                writer.write("ping\n".encode())
                await writer.drain()
                await asyncio.sleep(10)

        await asyncio.gather(read(), write(), ping())

    except ConnectionResetError as e:
        print("断开连接")
        print(e)
    except Exception as e:
        print("异常断开连接")
        print(e)

    # data = await reader.read(100)
    # print(f'Received: {data.decode()!r}')

    # print('Close the connection')
    # writer.close()
    # await writer.wait_closed()


if __name__ == '__main__':
    asyncio.run(tcp_echo_client1("Hello World!\n"))

服务端:

#! python3

import asyncio
import time

async def handle_echo(reader, writer):
    try:
        addr = writer.get_extra_info('peername')
        while True:
            # 5秒接收不到数据就超时
            data = await asyncio.wait_for(reader.readline(), timeout=30.0) 
            if not data:
                print(f"断开连接")
                break
            message = data.decode()

            print(f"Received {message!r} from {addr!r}")
            # print(f"Send: {message!r}")

            if message.strip() == "ping":
                writer.write("pong\n".encode())
            else:
                fromServer = f"from server: {message.strip()}\n"
                writer.write(fromServer.encode())

            await writer.drain()
    except TimeoutError:
        print("超时断连")
        writer.write("超时断开连接\n".encode())
        await writer.drain()
    finally:
        print("Close the connection")
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '0.0.0.0', 8885)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main())