DocuSign电子签API集成实战:批量发送信封与Webhook回调处理
引言
DocuSign作为电子签名领域的领先平台,近年来正从单一的电子签工具向IAM(智能协议管理)方向演进——2026年的产品路线图显示,自动化工作流和AI辅助审阅已成为其核心能力扩展方向。对于开发者而言,深入掌握DocuSign API的集成方法,是构建自动化签署流程的关键能力。
本文将以Python和Node.js双语言示例,覆盖从OAuth2.0认证、创建信封、批量发送,到Webhook回调处理和签署状态查询的完整链路。所有API概念基于官方文档,代码中的凭证信息使用占位符。
官方文档参考:DocuSign Developer Center
环境准备
开通开发者沙箱
首先需要在DocuSign开发者中心注册账号并创建沙箱应用,获取以下凭证:
Integration Key(客户端ID)RSA Key Pair(用于JWT认证)Account ID(沙箱账户标识)
安装依赖
Python环境:
pip install docusign-esign requests flaskNode.js环境:
npm install docusign-esign-sdk express axiosOAuth2.0 JWT认证
DocuSign支持多种认证方式,其中JWT Grant适合服务端对服务端的自动化场景——无需用户交互即可获取访问令牌。
Python JWT认证实现
# docusign_auth.py import jwt import time import requests class DocuSignAuth: """ DocuSign JWT OAuth2.0 认证模块 适用于服务端自动化场景,无需用户手动授权 """ def __init__(self, integration_key, user_id, rsa_private_key, account_id): self.integration_key = integration_key self.user_id = user_id self.rsa_private_key = rsa_private_key self.account_id = account_id self.base_url = "https://account-d.docusign.com" # 沙箱环境 def get_access_token(self): """ 生成JWT并换取access_token DocuSign要求JWT包含固定的scope和aud声明 """ now = int(time.time()) jwt_payload = { "iss": self.integration_key, "sub": self.user_id, "aud": "account-d.docusign.com", "iat": now, "exp": now + 3600, # 1小时有效期 "scope": "signature impersonation" } # 使用RSA私钥签名JWT jwt_token = jwt.encode(jwt_payload, self.rsa_private_key, algorithm="RS256") # 换取access_token response = requests.post( f"{self.base_url}/oauth/token", data={ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": jwt_token } ) token_data = response.json() # 更新base_url为实际的账户环境地址 self.base_url = token_data["accounts"][0]["base_uri"] return token_data["access_token"]Node.js JWT认证实现
// docusignAuth.js const jwt = require('jsonwebtoken'); const axios = require('axios'); class DocuSignAuth { /** * DocuSign JWT OAuth2.0 认证模块(Node.js版) * @param {string} integrationKey - 客户端ID * @param {string} userId - 沙箱用户ID * @param {string} rsaPrivateKey - RSA私钥(PEM格式) * @param {string} accountId - 账户ID */ constructor(integrationKey, userId, rsaPrivateKey, accountId) { this.integrationKey = integrationKey; this.userId = userId; this.rsaPrivateKey = rsaPrivateKey; this.accountId = accountId; this.baseUrl = 'https://account-d.docusign.com'; // 沙箱环境 } async getAccessToken() { const now = Math.floor(Date.now() / 1000); const jwtPayload = { iss: this.integrationKey, sub: this.userId, aud: 'account-d.docusign.com', iat: now, exp: now + 3600, scope: 'signature impersonation', }; const jwtToken = jwt.sign(jwtPayload, this.rsaPrivateKey, { algorithm: 'RS256', }); const response = await axios.post(`${this.baseUrl}/oauth/token`, { grant_type: 'urn:ietf:params:oauth:grant-type:jwt-bearer', assertion: jwtToken, }); // 更新为实际账户的base_uri this.baseUrl = response.data.accounts[0].base_uri; return response.data.access_token; } } module.exports = DocuSignAuth;创建并发送信封
单个信封创建(Python)
# create_envelope.py from docusign_esign import ApiClient, EnvelopesApi, EnvelopeDefinition from docusign_esign.models import ( Document, Signer, SignHere, Tabs, Recipients ) def create_envelope(auth, access_token): """ 创建并发送单个签署信封 演示基础的单签署方场景 """ # 初始化API客户端 api_client = ApiClient() api_client.host = auth.base_url api_client.set_default_header("Authorization", f"Bearer {access_token}") # 构造签署文档 document = Document( document_base64="BASE64_ENCODED_DOCUMENT_CONTENT", # 实际使用时替换为文件内容 name="服务协议.pdf", file_extension="pdf", document_id="1" ) # 配置签署方信息 signer = Signer( email="signer_a@example.com", # 示例邮箱,实际使用时替换 name="签署方A", recipient_id="1", routing_order="1" ) # 设置签署位置(坐标为基于文档左上角的偏移) sign_here = SignHere( document_id="1", page_number="1", x_position="100", y_position="150" ) signer.tabs = Tabs(sign_here_tabs=[sign_here]) # 组装信封定义 envelope_definition = EnvelopeDefinition( email_subject="请签署服务协议", documents=[document], recipients=Recipients(signers=[signer]), status="sent" # 直接发送,如需草稿状态设为"created" ) # 调用API创建信封 envelopes_api = EnvelopesApi(api_client) result = envelopes_api.create_envelope( account_id=auth.account_id, envelope_definition=envelope_definition ) print(f"信封创建成功,Envelope ID: {result.envelope_id}") return result.envelope_id批量发送信封(Node.js)
// batchSendEnvelopes.js const docusign = require('docusign-esign-sdk'); const fs = require('fs'); async function batchSendEnvelopes(auth, accessToken, recipientsList) { /** * 批量发送签署信封 * @param {Array} recipientsList - 签署方列表 * 每项格式: { email: string, name: string } */ const apiClient = new docusign.ApiClient(); apiClient.setBasePath(auth.baseUrl); apiClient.addDefaultHeader('Authorization', `Bearer ${accessToken}`); const envelopesApi = new docusign.EnvelopesApi(apiClient); const results = []; for (const [index, recipient] of recipientsList.entries()) { // 读取文档文件并转为Base64 const documentContent = fs.readFileSync('./agreement_template.pdf') .toString('base64'); const envelopeDefinition = new docusign.EnvelopeDefinition(); envelopeDefinition.emailSubject = '请签署合作协议'; // 构造文档对象 const document = new docusign.Document(); document.documentBase64 = documentContent; document.name = `合作协议_${recipient.name}.pdf`; document.fileExtension = 'pdf'; document.documentId = '1'; // 构造签署方 const signer = new docusign.Signer(); signer.email = recipient.email; signer.name = recipient.name; signer.recipientId = '1'; signer.routingOrder = '1'; // 设置签署位置 const signHere = new docusign.SignHere(); signHere.documentId = '1'; signHere.pageNumber = '1'; signHere.xPosition = '100'; signHere.yPosition = '150'; const tabs = new docusign.Tabs(); tabs.signHereTabs = [signHere]; signer.tabs = tabs; envelopeDefinition.documents = [document]; envelopeDefinition.recipients = new docusign.Recipients(); envelopeDefinition.recipients.signers = [signer]; envelopeDefinition.status = 'sent'; try { const result = await envelopesApi.createEnvelope( auth.accountId, { envelopeDefinition } ); results.push({ recipient: recipient.name, envelopeId: result.envelopeId, status: 'sent', }); console.log(`[${index + 1}/${recipientsList.length}] 信封已发送给 ${recipient.name}`); } catch (error) { // 单个信封失败不影响后续批量发送 results.push({ recipient: recipient.name, status: 'failed', error: error.message, }); } } return results; }Webhook回调处理
DocuSign通过Connect服务提供事件回调,当签署状态发生变化时主动推送通知到你的服务端。
回调接收服务(Python/Flask)
# webhook_server.py from flask import Flask, request import hmac import hashlib app = Flask(__name__) # DocuSign Connect配置中的HMAC密钥(在Connect设置页面获取) CONNECT_HMAC_KEY = "YOUR_CONNECT_HMAC_KEY" @app.route("/docusign/webhook", methods=["POST"]) def docusign_webhook(): """ 接收DocuSign Connect推送的签署状态变更事件 需要在DocuSign Connect配置中注册此回调URL """ # 1. 验证请求签名(使用HMAC-SHA256) signature = request.headers.get("X-Docusign-Signature-1", "") body = request.get_data() computed_hmac = hmac.new( CONNECT_HMAC_KEY.encode(), body, hashlib.sha256 ).hexdigest() if not hmac.compare_digest(computed_hmac, signature): return "Signature verification failed", 401 # 2. 解析XML格式的事件通知 from xml.etree import ElementTree as ET root = ET.fromstring(body.decode("utf-8")) # XML命名空间 ns = {"ds": "http://www.docusign.com/restapi"} envelope_status = root.find(".//ds:EnvelopeStatus", ns) if envelope_status is None: return "No status found", 200 envelope_id = envelope_status.find("ds:EnvelopeID", ns).text status = envelope_status.find("ds:Status", ns).text print(f"[Webhook] Envelope {envelope_id} 状态更新为: {status}") # 3. 根据状态执行业务逻辑 if status == "Completed": # 签署完成:下载签署文件、通知业务系统 print(f"信封 {envelope_id} 签署已完成,触发后续流程") handle_completed_envelope(envelope_id) elif status == "Declined": # 签署被拒:通知发起方 print(f"信封 {envelope_id} 被拒签") elif status == "Voided": # 信封作废 print(f"信封 {envelope_id} 已被作废") return "OK", 200 def handle_completed_envelope(envelope_id): """处理签署完成后的业务逻辑""" # 下载签署完成的文档 # 更新业务系统中的合同状态 # 发送完成通知邮件 pass if __name__ == "__main__": app.run(host="0.0.0.0", port=8080)签署状态查询
除了被动接收Webhook通知,也可以主动查询信封状态:
# query_status.py from docusign_esign import ApiClient, EnvelopesApi def query_envelope_status(auth, access_token, envelope_id): """ 主动查询指定信封的签署状态 适用于:轮询检查、异常恢复、手动同步等场景 """ api_client = ApiClient() api_client.host = auth.base_url api_client.set_default_header("Authorization", f"Bearer {access_token}") envelopes_api = EnvelopesApi(api_client) # 获取信封状态 envelope = envelopes_api.get_envelope( account_id=auth.account_id, envelope_id=envelope_id ) status = envelope.status print(f"Envelope {envelope_id}: {status}") # 状态可能的值: sent, delivered, completed, declined, voided # 获取签署方状态详情 recipients = envelopes_api.list_recipients( account_id=auth.account_id, envelope_id=envelope_id ) for signer in recipients.signers: print(f" 签署方 {signer.name}: {signer.status}") # signer.status: created, sent, delivered, signed, completed return { "envelope_id": envelope_id, "status": status, "recipients": [ {"name": s.name, "status": s.status} for s in recipients.signers ] }集成最佳实践
安全要点
- 私钥管理。RSA私钥存储在环境变量或密钥管理服务中,严禁硬编码在代码中。
- 回调验证。Webhook端点务必验证HMAC签名,防止伪造请求。
- Token续期。access_token有效期1小时,需要实现自动续期机制;JWT最长可续到8小时。
性能优化
- 批量发送时使用并发。对于大量信封发送场景,建议使用异步IO或线程池方式提升吞吐量,而非串行循环。
- Webhook幂等处理。DocuSign可能在网络超时后重发回调,业务逻辑需实现幂等性,可通过envelopeId+status作为幂等键。
- 状态缓存。对高频查询的信封状态做本地缓存(如Redis),设置合理的TTL,减少API调用频率。
错误处理策略
# error_handling_example.py import time from docusign_esign.rest import ApiException def create_envelope_with_retry(envelopes_api, account_id, definition, max_retries=3): """ 带重试机制的信封创建 处理API限流(429)和临时网络故障 """ for attempt in range(max_retries): try: result = envelopes_api.create_envelope( account_id=account_id, envelope_definition=definition ) return result except ApiException as e: if e.status == 429: # 被限流,等待后重试 wait_time = 2 ** attempt print(f"API限流,{wait_time}秒后重试...") time.sleep(wait_time) elif e.status >= 500: # 服务端错误,短暂等待后重试 time.sleep(1) else: # 客户端错误,不重试 raise raise Exception(f"重试{max_retries}次后仍未成功")结语
本文覆盖了DocuSign API集成中的四个核心环节:JWT认证、信封创建与批量发送、Webhook事件处理以及状态查询。在实际项目中,这些模块通常会封装为独立的SDK层,由业务系统统一调用。
DocuSign正在向IAM(智能协议管理)全面转型,后续的AI审阅、智能字段提取等功能将进一步扩展API的能力边界。建议开发者关注DocuSign官方开发者中心的最新动态,及时跟进API版本更新。
上海华万通信科技有限公司,专注为企业提供腾讯系SaaS产品的一站式选型与集成服务,包括腾讯会议、企业微信、腾讯电子签等。我们致力于帮助企业实现高效的数字化转型与智能化升级。
