企业微信自建应用收发消息

未认证的企业微信自建应用,将云服务器做为中转,本地电脑实现收发消息给企业微信的自建应用的效果。

注:使用

环境:

1.未认证的企业微信,自建应用

2.云服务器ubuntu server 22.04,默认自带python3.12、需安装nginx

3.已备案域名+ssl证书

4.家庭电脑WIN10,无公网IP

流程:

1.登录企业微信后台,创建自建应用、配置消息回调api

2.域名供应商添加A记录到云服务器

3.申请ssl证书

4.云服务器端,配置python脚本

5.云服务器端,配置nginx反代

6.登录企业微信后台,验证消息回调api

7.家庭电脑端,配置python脚本

8.手机企业微信app设置微信接收消息

9.测试

第一步,登录企业微信后台,创建自建应用、配置企业可信IP、配置消息回调api

企业微信后台:https://work.weixin.qq.com/wework_admin/loginpage_wx?from=myhome

需要记录企业ID、应用ID、应用Secret、消息回调api的url、消息回调api的token、消息回调api的EncodingAESKey

注:若第一次使用,建议先用手机app创建免费企业后,再登录企业后台!!!

1.1)登录企业后台,记录企业ID

企业信息查看

1.2)创建应用,并记录应用ID、应用Secret

创建应用

记录自建的应用ID、应用Secret

1.3)配置可企业可信IP

企业可信IP,只有可信任的ip才可以给企业微信发消息。当前需要填写云服务器的IP(云服务器做的代理转发!!!)

1.4)记录消息回调api的url、消息回调api的token、消息回调api的EncodingAESKey

url地址,需要提前规划api接口域名;Tokne和EncodingAESKey可随机也可自定义,需要记录到服务器端验证使用。

注:当前页面不要关闭,保持时需要先启动服务器端的python脚本后,才可用验证保存

第二步,域名供应商添加A记录到云服务器

域名供应商添加A记录到云服务器(当前国内子域名解析到云服务器,也需要去云服务器供应商备案,悲催。。。)

2.1)当前使用子域名wechat.ffing.cn到腾讯云服务器

2.2)nslookup验证解析状态

第三步,申请ssl证书

ssl证书用于自建应用的消息回调接口,必须443

注:ssl证书可以免费版的3个月,也可用付费,自行选择。

我这里使用免费的

第四步,云服务器端,配置python脚本

服务器端脚本负责两件事:

  • 监听企业微信的回调(处理验证、接收加密消息、解密、通过WebSocket转发给本地)。

  • 监听本地电脑的WebSocket连接,接收发送请求,调用企微信API发消息。

项目路径:/opt/wechat

服务端口:5000

4.1)安装python、nginx

4.2)创建python虚拟环境

1
2
3
4
5
6
7
# 创建项目目录
mkdir -p /opt/wechat
cd /opt/wechat

# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate

4.3)安装pip依赖

1
2
3
4
5
# flask: Web框架
# flask-sock: WebSocket支持
# cryptography: 用于企业微信消息加解密
# requests: 调用企微信API
pip install flask flask-sock cryptography requests

4.4)创建python服务端脚本:/opt/wechat/server.py

注:修改17-28行内容为自己的真实配置!!!

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import os
import json
import time
import hashlib
import base64
import logging
import random
import string
from flask import Flask, request, abort, jsonify
from flask_sock import Sock
from Crypto.Cipher import AES
from cryptography.hazmat.primitives import padding
import requests
import xml.etree.ElementTree as ET

# ================= 配置区域 =================
#配置,企业ID
CORP_ID = "第一步获取的:企业ID"
#配置,应用ID,int类型
AGENT_ID = 第一步获取的:应用ID
#配置,应用秘钥
SECRET = "第一步获取的:应用Secret"
#配置,消息回调api的Token
TOKEN = "第一步获取的:消息回调api的Token"
#配置,消息回调api的EncodingAESKey
ENCODING_AES_KEY_RAW = "第一步获取的:消息回调api的EncodingAESKey"
#配置,客户端WebSocket连接认证
AUTH_TOKEN = "自定义一个客户端连接WebSocket的密码" 
########################################################################



GET_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
SEND_MESSAGE_URL = "https://qyapi.weixin.qq.com/cgi-bin/message/send"

# ================= 初始化 =================
app = Flask(__name__)
sock = Sock(app)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 全局变量
access_token = None
token_expires_at = 0
client_ws_connection = None 

# ================= 工具函数:加解密 (WXBizMsgCrypt) - 已修复 =================
class WXBizMsgCrypt:
    def __init__(self, token, encoding_aes_key, corp_id):
        self.token = token
        self.corp_id = corp_id

        # 【修复点】Base64 解码需要长度是 4 的倍数。
        # 企业微信给的 Key 通常是 43 位,需要补一个 '=' 变成 44 位。
        key = encoding_aes_key
        missing_padding = len(key) % 4
        if missing_padding:
            key += '=' * (4 - missing_padding)

        try:
            self.aes_key = base64.b64decode(key)
        except Exception as e:
            logging.error(f"AES Key decoding failed: {e}")
            raise e

        self.block_size = 32

    def decrypt(self, encrypt_msg, msg_signature, timestamp, nonce):
        signature = self._generate_signature(timestamp, nonce, encrypt_msg)
        if signature != msg_signature:
            raise ValueError("Signature verification failed")

        try:
            aes_cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_key[:16])
            decrypted = aes_cipher.decrypt(base64.b64decode(encrypt_msg))

            # 去除 PKCS7 填充
            unpadder = padding.PKCS7(8 * self.block_size).unpadder()
            unpadded_data = unpadder.update(decrypted) + unpadder.finalize()

            # 解析结构: 16字节随机串 + 4字节长度 + 消息体 + 4字节corp_id
            msg_len = int.from_bytes(unpadded_data[16:20], 'big')
            msg_content = unpadded_data[20:20+msg_len].decode('utf-8')
            recv_corp_id = unpadded_data[20+msg_len:].decode('utf-8')

            if recv_corp_id != self.corp_id:
                raise ValueError("CorpID mismatch")

            return msg_content
        except Exception as e:
            logging.error(f"Decryption error: {e}")
            raise e

    def _generate_signature(self, timestamp, nonce, encrypt_msg):
        tmp_list = [self.token, timestamp, nonce, encrypt_msg]
        tmp_list.sort()
        sha = hashlib.sha1()
        sha.update("".join(tmp_list).encode('utf-8'))
        return sha.hexdigest()

    def encrypt(self, reply_msg, timestamp, nonce):
        random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
        msg_bytes = reply_msg.encode('utf-8')
        msg_len = len(msg_bytes).to_bytes(4, 'big')
        corp_id_bytes = self.corp_id.encode('utf-8')

        raw_data = random_str.encode('utf-8') + msg_len + msg_bytes + corp_id_bytes

        padder = padding.PKCS7(8 * self.block_size).padder()
        padded_data = padder.update(raw_data) + padder.finalize()

        aes_cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_key[:16])
        encrypted = aes_cipher.encrypt(padded_data)
        encrypt_msg = base64.b64encode(encrypted).decode('utf-8')

        signature = self._generate_signature(timestamp, nonce, encrypt_msg)

        return {
            "Encrypt": encrypt_msg,
            "MsgSignature": signature,
            "TimeStamp": timestamp,
            "Nonce": nonce
        }

# 使用原始 Key 初始化,类内部会自动处理 padding
cryptor = WXBizMsgCrypt(TOKEN, ENCODING_AES_KEY_RAW, CORP_ID)

# ================= Token 管理 =================
def get_access_token():
    global access_token, token_expires_at
    if access_token and time.time() < token_expires_at:
        return access_token

    url = f"{GET_TOKEN_URL}?corpid={CORP_ID}&corpsecret={SECRET}"
    try:
        resp = requests.get(url, timeout=5)
        data = resp.json()
        if data.get('errcode') == 0:
            access_token = data['access_token']
            token_expires_at = time.time() + data['expires_in'] - 200
            logging.info(f"New access token obtained.")
            return access_token
        else:
            logging.error(f"Failed to get token: {data}")
            return None
    except Exception as e:
        logging.error(f"Request token error: {e}")
        return None

# ================= WebSocket 路由 (带认证) =================
@sock.route('/ws/local_client')
def local_client(ws):
    global client_ws_connection

    # 1. 获取并验证 Token
    provided_token = request.args.get('token', '')
    if not provided_token or provided_token != AUTH_TOKEN:
        logging.warning(f"非法连接尝试!IP: {request.remote_addr}, Token: {provided_token}")
        try:
            ws.send(json.dumps({"error": "Authentication failed: Invalid token"}))
        except:
            pass
        return 

    logging.info(f"合法客户端已连接 (IP: {request.remote_addr})")
    client_ws_connection = ws

    try:
        while True:
            data = ws.receive()
            if data:
                try:
                    req_data = json.loads(data)
                    send_to_wechat(req_data)
                except json.JSONDecodeError:
                    ws.send(json.dumps({"error": "Invalid JSON"}))
    except Exception as e:
        logging.error(f"WebSocket connection error: {e}")
    finally:
        if client_ws_connection == ws:
            client_ws_connection = None
        logging.info("Client disconnected")

def send_to_wechat(payload):
    token = get_access_token()
    if not token:
        if client_ws_connection:
            client_ws_connection.send(json.dumps({"error": "Server failed to get access token"}))
        return

    full_payload = {
        "touser": payload.get("touser", "@all"),
        "msgtype": payload.get("msgtype", "text"),
        "agentid": AGENT_ID,
        "safe": 0
    }
    msg_type = payload.get("msgtype", "text")
    if msg_type in payload:
        full_payload[msg_type] = payload[msg_type]

    url = f"{SEND_MESSAGE_URL}?access_token={token}"
    try:
        resp = requests.post(url, json=full_payload, timeout=5)
        res_data = resp.json()
        if res_data.get('errcode') != 0:
            logging.error(f"Send message failed: {res_data}")
            if client_ws_connection:
                client_ws_connection.send(json.dumps({"error": f"WeChat API Error: {res_data.get('errmsg')}"}))
        else:
            logging.info(f"Message sent successfully.")
    except Exception as e:
        logging.error(f"Request send message error: {e}")

# ================= 企业微信回调路由 (HTTP) =================
@app.route('/wechat', methods=['GET', 'POST'])
def wechat_callback():
    msg_signature = request.args.get('msg_signature', '')
    timestamp = request.args.get('timestamp', '')
    nonce = request.args.get('nonce', '')
    echostr = request.args.get('echostr', '')

    if request.method == 'GET':
        if not all([msg_signature, timestamp, nonce, echostr]):
            abort(400)
        try:
            decrypt_echostr = cryptor.decrypt(echostr, msg_signature, timestamp, nonce)
            logging.info("Verification successful.")
            return decrypt_echostr
        except Exception as e:
            logging.error(f"Verification failed: {e}")
            abort(403)

    elif request.method == 'POST':
        try:
            root = ET.fromstring(request.data)
            encrypt_node = root.find('Encrypt')
            if encrypt_node is None:
                abort(400)
            encrypt_text = encrypt_node.text

            decrypted_xml_str = cryptor.decrypt(encrypt_text, msg_signature, timestamp, nonce)
            msg_root = ET.fromstring(decrypted_xml_str)

            msg_type = msg_root.find('MsgType').text
            content = msg_root.find('Content').text if msg_root.find('Content') is not None else ""
            from_user = msg_root.find('FromUserName').text
            create_time = msg_root.find('CreateTime').text

            logging.info(f"Received msg from {from_user}: {content}")

            forward_data = {
                "type": "receive",
                "data": {
                    "FromUserName": from_user,
                    "MsgType": msg_type,
                    "Content": content,
                    "Time": create_time
                }
            }

            if client_ws_connection:
                try:
                    client_ws_connection.send(json.dumps(forward_data, ensure_ascii=False))
                except Exception as e:
                    logging.error(f"Failed to forward to local: {e}")
            else:
                logging.warning("No local client connected, message dropped.")

            # 构造回复
            reply_xml = f"<xml><ToUserName><![CDATA[{from_user}]]></ToUserName><FromUserName><![CDATA[{CORP_ID}]]></FromUserName><CreateTime>{int(time.time())}</CreateTime><MsgType><![CDATA[text]]></MsgType><Content><![CDATA[收到]]></Content></xml>"
            encrypt_reply = cryptor.encrypt(reply_xml, timestamp, nonce)

            return json.dumps(encrypt_reply), 200, {'Content-Type': 'application/json'}

        except Exception as e:
            logging.error(f"Process message error: {e}")
            return "error", 200 

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=5000, debug=False)

4.5)启动python

注:若启动python异常,请排查安装依赖及17-25的配置信息

1
python /opt/wechat/server.py

4.6)验证脚本状态

当脚本启动后,控制台会打印5000端口(也可ss -ntlp,查看端口状态),随时等待企业后台消息回调api接口验证。

注:当企业微信后台,消息api保存信息时会打印验证日志状态码200,若提示openapi失败,一般是加密配置问题。

4.7)创建systemd服务(可选)

建议整个流程(客户端与企业微信应用正常消息收发)测试完成后,再创建systemd服务,做服务自启动。

创建systemd文件:/etc/systemd/system/wechat.service

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
[Unit]
Description=WeChat Bridge Service
After=network.target

[Service]
User=root
Group=root
WorkingDirectory=/opt/wechat
Environment="PATH=/opt/wechat/venv/bin"
ExecStart=/opt/wechat/venv/bin/python server.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

启动服务

1
2
3
4
systemctl daemon-reload
systemctl enable wechat
systemctl start wechat
systemctl status wechat

第五步,云服务器端,配置nginx反代

因为企业微信应用消息回调必须使用https,所以需要nginx进行反代。

5.1)添加nginx配置,我的nginx配置路径:/etc/nginx/conf.d/wechat.443.conf

注:我的nginx是做了server拆分为独立文件,请根据自己nginx配置环境进行更新!!!另请更新自己的ssl路径!!!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
server {
    listen 443 ssl;
    server_name wechat.ffing.cn;

    ssl_certificate /etc/ssl/wechat.ffing.cn.crt;
    ssl_certificate_key /etc/ssl/wechat.ffing.cn.key;

    # SSL 优化配置
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;

    location / {
        proxy_pass http://127.0.0.1:5000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # 关键支持 WebSocket 升级
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # 增加超时时间防止长连接断开
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

# 可选强制跳转 HTTPS (如果还没做)
server {
    listen 80;
    server_name wechat.ffing.cn;
    return 301 https://$host$request_uri;
}

5.2)重启nginx

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#apt安装
systemctl restart nginx

#编译安装的nginx重启
##################
NGINX路径/nginx -s reload
#停止
NGINX路径/nginx -s stop
#启动
NGINX路径/nginx

5.3)测试nginx端口

服务器测试命令,查看443端口:ss -ntlp

公网telnet、在线端口测试等

第六步,登录企业微信后台,验证消息回调api

验证码消息回调api信息,信息需与服务器端server.py信息一致。

注:验证前需确定服务器端脚本已正常运行!!!,

第七步,家庭电脑端,配置python脚本

家庭端脚本,

7.1)安装依赖

1
pip install websocket-client

7.2)python脚本,local.py

注:

注:AUTH_TOKEN 为认证密码,请注意修改为自己的密码!!!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import websocket
import json
import threading
import time
import ssl
import certifi
from urllib.parse import urlencode

# ================= 配置区域 =================
#服务端配置的websocket中的路由
BASE_WS_URL = "wss://wechat.ffing.cn/ws/local_client"
#必须与服务器端的 AUTH_TOKEN 完全一致
AUTH_TOKEN = "服务端配置的连接WebSocket密码" 
#发送消息用户ID(从企业通讯录重查找账号即ID)
#@all为全部人群
USER_ID="接收消息用户ID"
#############################################
# 拼接带认证的 URL
WS_URL = f"{BASE_WS_URL}?token={AUTH_TOKEN}"

def on_message(ws, message):
    try:
        data = json.loads(message)
        if data.get('type') == 'receive':
            msg_data = data['data']
            print(f"\n[收到企微消息] 来自: {msg_data['FromUserName']}")
            print(f"类型: {msg_data['MsgType']}")
            print(f"内容: {msg_data['Content']}")
            print("-" * 30)
        elif data.get('error'):
            print(f"\n[服务器错误]: {data['error']}")
            if "Authentication failed" in data['error']:
                print(">>> 认证失败!请检查本地代码中的 AUTH_TOKEN 是否与服务器一致。")
                print(">>> 程序将自动退出。")
                ws.close()
        else:
            print(f"[系统消息]: {message}")
    except json.JSONDecodeError:
        print(f"[原始消息]: {message}")

def on_error(ws, error):
    print(f"[连接错误]: {error}")

def on_close(ws, close_status_code, close_msg):
    print("### 连接已关闭 ###")

def on_open(ws):
    print("### 认证成功,已连接到云服务器 ###")
    print("提示:现在可以输入消息发送给企业微信了。")

def send_loop(ws):
    while True:
        try:
            user_input = input("\n输入要发送给企微的消息 (输入 'q' 退出): ")
            if user_input.lower() == 'q':
                ws.close()
                break
            if user_input.strip():
                payload = {
                    "touser": USER_ID, 
                    "msgtype": "text",
                    "text": {"content": user_input}
                }
                ws.send(json.dumps(payload))
                print(">>> 消息发送请求已提交")
        except Exception as e:
            print(f"输入错误: {e}")

if __name__ == "__main__":
    # 如需调试网络包,可取消下面这行的注释
    # websocket.enableTrace(True) 

    print(f"正在连接: {WS_URL}")
    ws = websocket.WebSocketApp(WS_URL,
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)

    wst = threading.Thread(target=lambda: ws.run_forever(
        sslopt={"cert_reqs": ssl.CERT_REQUIRED, "ca_certs": certifi.where()}
    ))
    wst.daemon = True
    wst.start()

    # 等待连接建立
    time.sleep(2) 

    if ws.sock and ws.sock.connected:
        send_loop(ws)
    else:
        print("无法连接到服务器。")
        print("可能原因:")
        print("1. 企业微信应用是否添加云服务器IP到企业可信IP")
        print("2. SSL证书问题")
        print("3. AUTH_TOKENT 不匹配被服务器拒绝(查看上方错误日志)")
        time.sleep(5)

7.3)启动python脚本,当前发送消息只有企业微信微信可以接收

服务器ubnutu日志如下:

Windows日志如下:

第八步,手机企业微信app设置微信接收消息

企业微信左上角”三条杠“ --->右下角“设置” ---> "消息通知" --> "仅在企业微信接受消息" ---> 关闭”会话消息“、关闭”应用消息“

设置完成后,即便企业微信没登录,也可以推送消息到个人微信。

第九步,验证