文章标签 ‘budwk’
2024七月23

BudIot 开源物联网设备平台v1.0发布

BUDIOT 是一个开源的、企业级的物联网平台,它集成了设备管理、协议解析、消息订阅、场景联动等一系列物联网核心能力,支持以平台适配设备的方式连接海量设备,支持在线下发指令实现远程控制,支持扩展水电气等各类计费业务场景。

本平台是在千万级设备实时计费物联网平台经验基础上,在不损失性能的前提下进行功能删减、结构优化而来,小而美,同时又具备灵活的扩展性。

源码: https://github.com/budwk/budiot

在线演示地址: https://demo.budiot.com 用户名: superadmin 密码: 1

官网: https://budiot.com

开发框架

基于自研 Java 微服务框架 https://budwk.com

简单说明

Jar 运行模块

  • budiot-access/budiot-access-gateway 设备网关,用于设备协议和 network 组件
  • budiot-access/budiot-access-processor 设备数据上报业务处理模块
  • budiot-server WEB 服务 API ,定时任务等

其他模块说明

  • budiot-access/budiot-access-network 网络组件,支持 TCP/MQTT/UDP/HTTP 等
  • budiot-access/budiot-access-protocol 设备协议开发包,内含 demo 示例
  • budiot-access/budiot-access-storage 设备数据存储,可扩展时序数据库等

前端模块

  • budiot-vue-admin Vue3 + Element-Plus

开发环境

  • OpenJDK 11
  • Redis 6.x
  • MariaDB 10.x
  • MongoDB 7.0.x
  • RocketMQ 5.2.x

设备上报有效数据存储

默认采用 MongoDB 7 的时序集合,可根据项目规模需要,扩展为 TDEngine 等时序数据库

2024七月8

600万设备连接平台遇到的坑

MongoDB 的坑

listCollections

  • 问题:代码中存在 listCollections 操作,大量数据上报时,造成MongoDB CPU高升,使处理性能受到影响。经过检查,发现通信报文日表是自动创建的,每次都会判断集合是否存在;而mongo驱动包里判断集合是否存在的操作,就是先执行 listCollections;
  • 优化:提前创建好需要的集合,不要在数据上报的时候进行判断、创建;ps:其实其他数据库,比如mysql、tdengine等,都是同样的道理;

连接数过大

  • 问题:MongoDB 连接数配置较大,导致很多handler服务,过多的线程会导致上下文切换开销变大,同时内存开销也会上涨;
  • 优化:调低连接数配置,进行压力测试;

规则引擎的坑

  • 问题:为了提升性能,往往采用队列+规则引擎来处理业务,设备协议解析+计费业务是在一个handler里,而规则引擎则在下一个队列处理;规则引擎负责为欠费的表具创建短信提醒、关阀指令(产品欠费规则配置),往往在第一个队列里需等待N秒,将规则引擎创建的指令一起下方给表具,每天几百万表具实时上报数据+实时计费,数据并发量较大,如果都等待N秒,会严重影响处理性能;
  • 优化:对计费后余额大于等于0的的表具,不等待N秒,直接回复结束指令,绝大部分表具其实已经是关阀状态,无需本次下发关阀指令,哪怕有需要关阀的,延后到下次通信再执行也没影响;对于小于0的表具,则使用原有逻辑;优化后,降低了90%的等待时间,大大提升高峰高并发处理性能;

HTTP订阅的坑

  • 问题:前期主要通过AEP平台的http订阅实现NB表的通信,但是http服务在高并发时有时候会挂掉(具体表象就是两个http服务,只有一个存活,另一个服务存在,但不处理数据);
  • 优化:将http服务单独服务部署,不要和其他handler、服务抢占服务器资源,jvm也就不会崩了;后期采用MQ订阅方式;

站内信的坑

  • 问题:原有产品设计当设备产生告警时(原生告警、规则告警等)有站内信提醒,经过项目实际运行,站内信数据非常庞大,时刻都有设备告警,导致web页面卡顿、后台web服务资源占用高;
  • 优化:去掉设备站内信告警功能,一是站内信管理人员完全看不过来,二是设备告警在功能菜单里可以查询到;

2021七月30

MongoDB 5.0 时序集合数据示例代码

采用最新Java驱动

<dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-sync</artifactId>
            <version>4.3.0</version>
</dependency>

获取数据源及数据库


/**
 * @author wizzer@qq.com
 */
public class ZMongoDatabase {

    private MongoDatabase db;

    public ZMongoDatabase(MongoDatabase db) {
        this.db = db;
    }

    /**
     * 获取集合,集合不存在则返回 null
     *
     * @param name 集合名称
     * @return 集合薄封装
     */
    public MongoCollection<Document> getCollection(String name) {
        if (!this.collectionExists(name)) {
            return null;
        }
        return db.getCollection(name);
    }

    /**
     * 获取一个集合,如果集合不存在,就创建它
     *
     * @param name         集合名
     * @param dropIfExists true 如果存在就清除
     * @return 集合薄封装
     */
    public MongoCollection<Document> createCollection(String name, boolean dropIfExists) {
        // 不存在则创建
        if (!this.collectionExists(name)) {
            return createCollection(name, null);
        }
        // 固定清除
        else if (dropIfExists) {
            db.getCollection(name).drop();
            return createCollection(name, null);
        }
        // 已经存在
        return db.getCollection(name);
    }

    /**
     * 获取一个集合,如果集合不存在,就创建它
     *
     * @param name         集合名
     * @param options      集合配置信息
     * @param dropIfExists true 如果存在就清除
     * @return 集合薄封装
     */
    public MongoCollection<Document> createCollection(String name, CreateCollectionOptions options, boolean dropIfExists) {
        // 不存在则创建
        if (!this.collectionExists(name)) {
            return createCollection(name, options);
        }
        // 固定清除
        else if (dropIfExists) {
            db.getCollection(name).drop();
            return createCollection(name, options);
        }
        // 已经存在
        return db.getCollection(name);
    }

    /**
     * 创建一个集合
     *
     * @param name    集合名
     * @param options 集合配置信息
     * @return 集合薄封装
     */
    public MongoCollection<Document> createCollection(String name, CreateCollectionOptions options) {
        if (this.collectionExists(name)) {
            throw Lang.makeThrow("Colection has exists: %s.%s", db.getName(), name);
        }
        // 创建默认配置信息
        if (null == options) {
            options = new CreateCollectionOptions().capped(false);
        }
        db.createCollection(name, options);
        return db.getCollection(name);
    }

    /**
     * 判断集合是否存在
     *
     * @param collectionName 集合名
     * @return
     */
    public boolean collectionExists(String collectionName) {
        return listCollectionNames().contains(collectionName);
    }

    /**
     * @return 当前数据库所有可用集合名称
     */
    public List<String> listCollectionNames() {
        return db.listCollectionNames().into(new ArrayList<String>());
    }

    public MongoDatabase getNativeDB() {
        return this.db;
    }
}

初始化数据库示例

@Inject
    private ZMongoDatabase zMongoDatabase;

    public void init() {
        CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
        TimeSeriesOptions timeSeriesOptions = new TimeSeriesOptions("ts");
        timeSeriesOptions.metaField("metadata");
        timeSeriesOptions.granularity(TimeSeriesGranularity.SECONDS);
        collectionOptions.timeSeriesOptions(timeSeriesOptions);
        MongoCollection<Document> deviceCollection = zMongoDatabase.createCollection("device", collectionOptions, true);
        List<Document> list = new ArrayList<>();
        Device device = new Device(Times.now(), 36.7, "0001");
        list.add(new Document().append("ts", device.getTs()).append("temperature", device.getTemperature()).append("metadata",
                new Document().append("no", device.getNo())));

        Device device1 = new Device(Times.now(), 35.2, "0002");
        list.add(new Document().append("ts", device1.getTs()).append("temperature", device1.getTemperature()).append("metadata",
                new Document().append("no", device1.getNo())));

        Device device2 = new Device(Times.now(), 10.7, "0002");
        list.add(new Document().append("ts", device2.getTs()).append("temperature", device2.getTemperature()).append("metadata",
                new Document().append("no", device2.getNo())));

        Device device3 = new Device(Times.nextDay(Times.now(), 1), 30.0, "0002");
        list.add(new Document().append("ts", device3.getTs()).append("temperature", device3.getTemperature()).append("metadata",
                new Document().append("no", device3.getNo())));

        // Document.parse(Json.toJson(new Device()));

        InsertManyResult insertManyResult = deviceCollection.insertMany(list);
        log.info(Json.toJson(insertManyResult));
    }

时序数据求平均值示例

    // 官方时序数据统计Demo https://docs.mongodb.com/manual/core/timeseries-collections/
    @Ok("json:full")
    @At("/avg")
    public Object avg(@Param(value = "no", df = "0002") String no) {
        List<Bson> bsons = new ArrayList<>();
        // 筛选条件
        Bson match = Aggregates.match(Filters.eq("metadata.no", no));

        // 输出对象
        Bson project = Aggregates.project(Projections.fields(
                // 日期格式化
                Projections.computed("date", new Document("$dateToParts", new Document().append("date", "$ts"))),
                Projections.computed("temperature", 1)
        ));

        // 分组统计 平均值
        Bson group = Aggregates.group(new Document("date", new Document().append("year", "$date.year").append("month", "$date.month").append("day", "$date.day")),
                Accumulators.avg("avgTemp", "$temperature")
        );

        bsons.add(match);
        bsons.add(project);
        bsons.add(group);
        log.info(Json.toJson(bsons));
        return zMongoDatabase.getCollection("device").aggregate(bsons);
    }

完整代码:

nutzboot starter 组件

nutzboot-starter-mongodb-plus

组件使用示例

nutzboot-demo-simple-mongodb-plus

i’am a separator…

2021三月25

TDengine 时序数据库的 NutzBoot 开发实例

nutz 及 nutzboot 已支持 TDengine

基于 nutzboot 开发 TDengine 实例

开发环境

  • 服务端:CentOS 8.2 64 位
  • 客户端:Windows 10 64 位

TDengine 安装及配置

  • 官网下载 rpm 安装包
  • 执行安装 rpm -ivh TDengine-server-2.0.18.0-Linux-x64.rpm
  • 修改配置文件 vi /etc/taos/taos.cfg 加上当前服务器 hostname 主机名
# first fully qualified domain name (FQDN) for TDengine system
firstEp                   wizzer-test:6030
# local fully qualified domain name (FQDN)
fqdn                      wizzer-test

  • 若为默认密码则直接输入 taos 或 taos -h 127.0.0.1 执行数据库创建命令
taos > create database test;

Windows 10 hosts 配置

  • 修改 C:\Windows\System32\drivers\etc\hosts
  • 添加 ip wizzer-test

创建 nutzboot Maven 项目

  • pom.xml 文件加入 nutzboot 及 TDengine JDBC 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.budwk</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <nutzboot.version>2.4.2-SNAPSHOT</nutzboot.version>
        <jaxb-api.version>2.3.1</jaxb-api.version>
        <slf4j.version>1.7.25</slf4j.version>
        <logback.version>1.2.3</logback.version>
        <taos-jdbcdriver.version>2.0.23</taos-jdbcdriver.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.nutz</groupId>
            <artifactId>nutzboot-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nutz</groupId>
            <artifactId>nutzboot-starter-nutz-dao</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nutz</groupId>
            <artifactId>nutzboot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbcdriver</artifactId>
            <version>${taos-jdbcdriver.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.nutz</groupId>
                <artifactId>nutzboot-parent</artifactId>
                <version>${nutzboot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <repositories>
        <repository>
            <id>nutz</id>
            <url>http://jfrog.nutz.cn/artifactory/libs-release</url>
        </repository>
        <repository>
            <id>nutz-snapshots</id>
            <url>http://jfrog.nutz.cn/artifactory/snapshots</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>nutz-snapshots</id>
            <url>http://jfrog.nutz.cn/artifactory/snapshots</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </pluginRepository>
    </pluginRepositories>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerArgs>
                        <arg>-parameters</arg>
                    </compilerArgs>
                    <useIncrementalCompilation>false</useIncrementalCompilation>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.nutz.boot</groupId>
                <artifactId>nutzboot-maven-plugin</artifactId>
                <version>${nutzboot.version}</version>
            </plugin>
        </plugins>
    </build>
</project>
  • 创建实体类
/**
 * 注意 TDengine 表及字段名都为小写字母
 */
@Table("iot_dev")
public class Iot_dev implements Serializable {
    private static final long serialVersionUID = 1L;

    @Column
    @Comment("ID")
    @ColDefine(type = ColType.TIMESTAMP)
    private Date ts;

    @Column("devid") //字段名都为小写字母
    @Comment("设备 ID")
    @ColDefine(type = ColType.VARCHAR, width = 32)
    private String devId;

    @Column("devtype") //字段名都为小写字母
    @Comment("设备类型")
    @ColDefine(type = ColType.BINARY, width = 32)
    private String devType;

    @Column
    @Comment("状态")
    @ColDefine(type = ColType.BOOLEAN)
    private Boolean status;

    @Column
    @Comment("读数 1")
    @ColDefine(type = ColType.DOUBLE)
    private Double val1;

    @Column
    @Comment("读数 2")
    @ColDefine(type = ColType.INT)
    private Integer val2;

    @Column
    @Comment("读数 3")
    @ColDefine(type = ColType.INT,width = 3)
    private Integer val3;

    @Column
    @Comment("读数 4")
    @ColDefine(type = ColType.INT,width = 2)
    private Integer val4;

}

完整代码见

https://gitee.com/wizzer/demo/tree/master/nutzboot-tdengine-demo
2021三月24

BudWK V6 代码生成器 IDEA 插件发布

本插件不同于V5代码生成器插件,无须引入项目中其他jar包,无须事先编译POJO类:

  • 插件不依赖任何第三方jar包
  • 通过 POJO 类生成接口类、接口实现类、控制类
  • IDEA 须从项目根目录打开加载项目(以获取正确的 projectBasePath )
  • 打开 POJO 类Java文件,在文件内部右击选择”Generate”->”WkCodeGenerator”

插件下载:
https://gitee.com/budwk/budwk-codegenerator/releases

插件源码:

https://gitee.com/budwk/budwk-codegenerator
https://github.com/budwk/budwk-codegenerator
2020十月28

BudWK V6 之微信支付V3开发流程

  • 一、商户后台设置V3 Key密钥及下载V3 API证书(三个文件分别为apiclient_key.pem、apiclient_cert.pem、apiclient_cert.p12)
  • 二、设计表结构实现管理功能
package com.budwk.nb.wx.models;

import com.budwk.nb.commons.base.model.BaseModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.nutz.dao.entity.annotation.*;
import org.nutz.dao.interceptor.annotation.PrevInsert;

import java.io.Serializable;

/**
 * 微信支付配置表
 * @author wizzer@qq.com
 */
@Data
@EqualsAndHashCode(callSuper = true)
@Table("wx_pay")
public class Wx_pay extends BaseModel implements Serializable {
    private static final long serialVersionUID = 1L;
    @Column
    @Name
    @Comment("ID")
    @ColDefine(type = ColType.VARCHAR, width = 32)
    @PrevInsert(els = {@EL("uuid()")})
    private String id;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 32)
    private String name;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 32)
    private String mchid;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 50)
    private String v3key;

    /**
     * apiclient_key.pem 物理路径
     */
    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String v3keyPath;

    /**
     * apiclient_cert.pem 物理路径
     */
    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String v3certPath;

    /**
     * apiclient_cert.p12 物理路径
     */
    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String v3certP12Path;

   /**
     * 平台证书失效时间
     */
    @Column
    private Long expire_at;
}
package com.budwk.nb.wx.models;

import com.budwk.nb.commons.base.model.BaseModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.nutz.dao.entity.annotation.*;
import org.nutz.dao.interceptor.annotation.PrevInsert;

import java.io.Serializable;

/**
 * 平台证书临存表
 * @author wizzer@qq.com
 */
@Data
@EqualsAndHashCode(callSuper = true)
@Table("wx_pay_cert")
@TableIndexes({@Index(name = "INDEX_WX_PAY_CERT", fields = {"mchid", "serial_no"}, unique = true)})
public class Wx_pay_cert extends BaseModel implements Serializable {
    private static final long serialVersionUID = 1L;
    @Column
    @Name
    @Comment("ID")
    @ColDefine(type = ColType.VARCHAR, width = 32)
    @PrevInsert(els = {@EL("uuid()")})
    private String id;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 32)
    private String mchid;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String serial_no;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String effective_time;

    @Column
    private Long effective_at;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String expire_time;

    @Column
    private Long expire_at;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String algorithm;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String nonce;

    @Column
    @ColDefine(type = ColType.VARCHAR, width = 255)
    private String associated_data;

    @Column
    @ColDefine(type = ColType.TEXT)
    private String ciphertext;

    @Column
    @ColDefine(type = ColType.TEXT)
    private String certificate;
}
  • 三、封装下订单/JSAPI/平台证书更新等功能服务类
package com.budwk.nb.web.commons.ext.wx;

import com.alibaba.dubbo.config.annotation.Reference;
import com.budwk.nb.web.commons.base.Globals;
import com.budwk.nb.wx.models.Wx_pay;
import com.budwk.nb.wx.models.Wx_pay_cert;
import com.budwk.nb.wx.services.WxPayCertService;
import com.budwk.nb.wx.services.WxPayService;
import org.nutz.dao.Chain;
import org.nutz.dao.Cnd;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.lang.Strings;
import org.nutz.lang.Times;
import org.nutz.lang.util.NutMap;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.nutz.weixin.bean.WxPay3Response;
import org.nutz.weixin.util.WxPay3Api;
import org.nutz.weixin.util.WxPay3Util;

import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;

/**
 * @author wizzer@qq.com
 */
@IocBean
public class WxPay3Service {
    private static final Log log = Logs.get();
    private static final SimpleDateFormat DATE_TIME_ZONE = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX");
    @Inject
    @Reference(check = false)
    private WxPayCertService wxPayCertService;
    @Inject
    @Reference(check = false)
    private WxPayService wxPayService;


    // 通过商户号获取 wx_pay 对象
    public synchronized Wx_pay getWxPay(String mchid) {
        Wx_pay wxPay = Globals.WxPay3Map.getAs(mchid, Wx_pay.class);
        if (wxPay == null) {
            wxPay = wxPayService.fetch(Cnd.where("mchid", "=", mchid));
            Globals.WxPay3Map.put(wxPay.getMchid(), wxPay);
        }
        checkPlatfromCerts(wxPay);
        return wxPay;
    }

    // 检查及更新平台证书机制
    public void checkPlatfromCerts(Wx_pay wxPay) {
        if (wxPay == null)
            throw new IllegalStateException("Wx_pay is null");
        if (wxPay.getExpire_at() == null || wxPay.getExpire_at() == 0 || wxPay.getExpire_at() < 8 * 3600 * 1000 + System.currentTimeMillis()) {
            getPlatfromCerts(wxPay.getMchid(), wxPay.getV3key(), wxPay.getV3keyPath(), wxPay.getV3certPath());
            wxPay = wxPayService.fetch(Cnd.where("mchid", "=", wxPay.getMchid()));
            Globals.WxPay3Map.put(wxPay.getMchid(), wxPay);
        }
    }

    // jsapi 订单下单
    public WxPay3Response v3_order_jsapi(String mchid, String body) throws Exception {
        log.debug("v3_order_jsapi body::" + body);
        String serialNo = WxPay3Util.getCertSerialNo(getWxPay(mchid).getV3certPath());
        return WxPay3Api.v3_order_jsapi(mchid, serialNo, getWxPay(mchid).getV3keyPath(), body);
    }

    // 通过jsapi 订单号生成js参数
    public NutMap v3_call_jsapi(String mchid, String appid, String prepay_id) throws Exception {
        return WxPay3Util.getJsapiSignMessage(appid, prepay_id, getWxPay(mchid).getV3keyPath());
    }

    // 验证http响应签名结果
    public boolean verifySignature(WxPay3Response wxPay3Response, String mchid) throws Exception {
        Wx_pay_cert wxPayCert = wxPayCertService.fetch(Cnd.where("mchid", "=", mchid).and("serial_no", "=", wxPay3Response.getHeader().get("Wechatpay-Serial")));
        return WxPay3Util.verifySignature(wxPay3Response, wxPayCert.getCertificate());
    }

    // 验证回调通知签名及内容
    public String verifyNotify(String mchid, String serialNo, String body, String signature, String nonce, String timestamp) throws Exception {
        Wx_pay_cert wxPayCert = wxPayCertService.fetch(Cnd.where("mchid", "=", mchid).and("serial_no", "=", serialNo));
        return WxPay3Util.verifyNotify(serialNo, body, signature, nonce, timestamp,
                getWxPay(mchid).getV3key(), wxPayCert.getCertificate());
    }

    /**
     * 请求并保存新证书
     *
     * @param mchid
     * @return
     */
    public void getPlatfromCerts(String mchid, String v3Key, String v3KeyPatch, String v3CertPath) {
        try {
            wxPayCertService.clear(Cnd.where("mchid", "=", mchid).and("expire_at", "<", System.currentTimeMillis()));
            String serialNo = WxPay3Util.getCertSerialNo(v3CertPath);
            WxPay3Response wxPay3Response = WxPay3Api.v3_certificates(mchid, serialNo, v3KeyPatch);
            if (wxPay3Response.getStatus() == 200) {
                NutMap nutMap = Json.fromJson(NutMap.class, wxPay3Response.getBody());
                List<NutMap> list = nutMap.getList("data", NutMap.class);
                for (NutMap cert : list) {
                    Wx_pay_cert wxPayCert = new Wx_pay_cert();
                    wxPayCert.setMchid(mchid);
                    wxPayCert.setEffective_time(cert.getString("effective_time"));
                    wxPayCert.setExpire_time(cert.getString("expire_time"));
                    long expire_at = 0;
                    try {
                        expire_at = Times.parse(DATE_TIME_ZONE, cert.getString("expire_time")).getTime();
                        wxPayCert.setEffective_at(Times.parse(DATE_TIME_ZONE, cert.getString("effective_time")).getTime());
                        wxPayCert.setExpire_at(expire_at);
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
                    wxPayCert.setSerial_no(cert.getString("serial_no"));
                    NutMap encrypt_certificate = cert.getAs("encrypt_certificate", NutMap.class);
                    wxPayCert.setAlgorithm(encrypt_certificate.getString("algorithm"));
                    wxPayCert.setAssociated_data(encrypt_certificate.getString("associated_data"));
                    wxPayCert.setCiphertext(encrypt_certificate.getString("ciphertext"));
                    wxPayCert.setNonce(encrypt_certificate.getString("nonce"));
                    String platformCertificate = WxPay3Util.decryptToString(v3Key.getBytes(StandardCharsets.UTF_8),
                            encrypt_certificate.getString("associated_data").getBytes(StandardCharsets.UTF_8),
                            encrypt_certificate.getString("nonce").getBytes(StandardCharsets.UTF_8),
                            encrypt_certificate.getString("ciphertext")
                    );
                    wxPayCert.setCertificate(platformCertificate);
                    try {
                        wxPayCertService.insert(wxPayCert);
                    } catch (Exception e) {
                        //重复的插入会报错,不管它
                    }
                }
                Wx_pay_cert wxPayCert = wxPayCertService.fetch(Cnd.where("mchid", "=", mchid).orderBy("effective_at", "desc"));
                if (wxPayCert != null) {
                    wxPayService.update(Chain.make("expire_at", wxPayCert.getExpire_at()), Cnd.where("mchid", "=", mchid));
                }
            }
        } catch (Exception e) {
            log.errorf("获取平台证书失败,mchid=%s", mchid, e);
        }
    }
}
  • 四、小程序支付业务代码
@Test
    public void test_v3_order() throws Exception {
        String orderPayNo = R.UU32();
        String orderId = R.UU32();
        NutMap wxPayUnifiedOrder = NutMap.NEW();
        wxPayUnifiedOrder.addv("appid", appid);
        wxPayUnifiedOrder.addv("mchid", mchid);
        wxPayUnifiedOrder.addv("description", new String(("LaiShop-order-" + orderId).getBytes(), StandardCharsets.UTF_8));
        wxPayUnifiedOrder.addv("out_trade_no", orderPayNo);
        Date now = new Date();
        wxPayUnifiedOrder.addv("time_expire", DateUtil.getDateAfterMinute(now, 30));
        // 回调通知URL传递mchid商户号,便于系统支持接入N个小程序及支付商户账号
        wxPayUnifiedOrder.addv("notify_url", Globals.AppDomain + "/shop/open/wxpay/" + mchid + "/notify");
        wxPayUnifiedOrder.addv("amount", NutMap.NEW().addv("total", 1).addv("currency", "CNY"));
        wxPayUnifiedOrder.addv("payer", NutMap.NEW().addv("openid", "o9Bnd4lXKfNsOci-6H98zCMWyBps"));
        String body = Json.toJson(wxPayUnifiedOrder);
        System.out.println("body::" + body);
        WxPay3Response wxPay3Response = wxPay3Service.v3_order_jsapi(mchid, body);
        System.out.println("wxPay3Response::" + Json.toJson(wxPay3Response));
        boolean verifySignature = wxPay3Service.verifySignature(wxPay3Response, mchid);
        System.out.println("verifySignature::" + verifySignature);
        NutMap v3order = Json.fromJson(NutMap.class, wxPay3Response.getBody());
        NutMap resp = wxPay3Service.v3_call_jsapi(mchid, appid, v3order.getString("prepay_id"));
        System.out.println("resp::" + Json.toJson(resp));
    }
  • 五、回调通知业务代码
package com.budwk.nb.web.controllers.open.pay;

import com.alibaba.dubbo.config.annotation.Reference;
import com.budwk.nb.web.commons.ext.wx.WxPay3Service;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.servers.Server;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.lang.Streams;
import org.nutz.lang.util.NutMap;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.nutz.mvc.adaptor.VoidAdaptor;
import org.nutz.mvc.annotation.AdaptBy;
import org.nutz.mvc.annotation.At;
import org.nutz.mvc.annotation.Ok;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;

/**
 * @author wizzer@qq.com
 */
@IocBean
@At("/shop/open/wxpay")
@Ok("json")
@OpenAPIDefinition(tags = {@Tag(name = "商城_微信支付回调")}, servers = @Server(url = "/"))
public class WxPay3NotifyController {
    private static final Log log = Logs.get();
    @Inject
    private WxPay3Service wxPay3Service;

    @At("/{mchid}/notify")
    @Ok("raw")
    @AdaptBy(type = VoidAdaptor.class)
    public void notify(String mchid, Reader reader, HttpServletRequest req, HttpServletResponse resp) throws IOException {
        try {
            NutMap map = NutMap.NEW();
            String timestamp = req.getHeader("Wechatpay-Timestamp");
            String nonce = req.getHeader("Wechatpay-Nonce");
            String serialNo = req.getHeader("Wechatpay-Serial");
            String signature = req.getHeader("Wechatpay-Signature");
            log.debugf("timestamp=%s,nonce=%s,serialNo=%s,signature=%s", timestamp, nonce, serialNo, signature);
            String body = Streams.readAndClose(reader);
            // 需要通过证书序列号查找对应的证书,verifyNotify 中有验证证书的序列号
            String plainText = wxPay3Service.verifyNotify(mchid, serialNo, body, signature, nonce, timestamp);
            log.debugf("支付通知明文=%s", plainText);
            NutMap res = Json.fromJson(NutMap.class, plainText);
            NutMap payer = res.getAs("payer", NutMap.class);
            String trade_state = res.getString("trade_state");
            String out_trade_no = res.getString("out_trade_no");
            String openid = payer.getString("openid");
            boolean ok = true;//业务代码入库
            if ("SUCCESS".equals(trade_state) && ok) {
                resp.setStatus(200);
                map.put("code", "SUCCESS");
                map.put("message", "SUCCESS");
            } else {
                resp.setStatus(500);
                map.put("code", "ERROR");
                map.put("message", "签名错误");
            }
            resp.setHeader("Content-type", "application/json");
            resp.getOutputStream().write(Json.toJson(map).getBytes(StandardCharsets.UTF_8));
            resp.flushBuffer();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

本文档以 BudWk 框架代码为例,源码地址: https://gitee.com/budwk/budwk-nutzboot

演示地址: https://demo.budwk.com

2020九月12

NutzWk 5.2.7 发布,Java 国产微服务分布式开发框架

本次 v5.2.7 发布带来三个版本:

  • v5.2.7-nacos Nacos 注册中心及配置中心功能微服务版本
  • v5.2.7-zookeeper Zookeeper 注册中心微服务版本
  • v5.2.7-mini 单应用一个 Jar 或 War 即可启动运行版本

技术体系

后端技术:nutzboot(国产,基于国产nutz) + dubbo + redis + shiro + quartz + logback + beetl 等主流技术
前端技术:jquery + vue.js + elementUI 和 jquery + bootstrap 两个版本可选

演示地址

V6演示地址: https://demo.budwk.com (前后端分离 nuxt+vue+elementUI)

V5演示地址: https://nutzwk.wizzer.cn (beetl+html+vue.js+elementUI)

项目介绍

NutzWk(V6起更名为BudWk)发展自2010年,2012年开始用于商业项目,至今已服务于全国各地公司大大小小上千个项目,行业涉及政务、电商、物联网等,随着经验积累及从事行业的不同分别发布了v1.x至v6.x多个版本,每个版本都是完整运行且完全开源免费的,您可以根据项目规模选择不同版本。本项目案例众多,省厅级项目、市级平台、大数据项目、电商平台、物联网平台等等,详见官网 https://budwk.com

源码地址:

V6源码地址: https://gitee.com/budwk/budwk-nutzboot

V1-V5源码地址: https://gitee.com/wizzer/NutzWk

2020八月21

BudWk 国产开源Java微服务分布式框架在智慧燃气行业的应用

燃气公司现状及痛点

  • 燃气表品牌多、型号多、计费类型多,厂家附送系统各自独立且无法自动对账,导致账目误差时有出现,实际经营情况无法实时掌握。
  • 物联网智能表具原来越多,物联网设备本身的安全监测是当前的核心问题,且因设备厂家、型号多样化,设备的统一接入、监控就尤为重要。
  • 从市场拓展到客户服务无法全流程业务管控、联动和监督,导致物资储备、施工安排、工程监督等无法根据实际情况实时管控,用户开户、移表等服务无法高效响应和调度。

BudWk微服务解决方案

  • IC卡表的统一集成

使用 WPF + CefSharp 技术,C/S客户端 + B/S浏览器的组合,利用客户端实现IC卡读写器的集成开发,实现IC卡表的读写功能,利用B/S浏览器,将营收系统嵌入浏览器,实现WEB营收业务代码热更新、数据统一管理等功能。

  • 物联网表的统一集成

抽象设备接入层,实现电信AEP平台、移动OneNET、厂家物联网平台等平台接入,实现 NB-IOT/MQTT/HTTP 等协议的适配和转换,将各表厂繁杂不一的数据格式转换为本平台统一数据格式,并利用规则引擎技术,实现数据的智能化处理。

  • 计费类型的统一集成

系统内置预付费、后付费、表端计费等计费类型,支持“购气/退气”和“充值/退费”等业务形态,支持阶梯计价、区域计价等价格规则。

  • 工单系统的统一集成

燃气报装、报修、维修、安检等业务流程标准化、制度化,通过流程配置、节点配置、权限配置等,实现业务工单的动态分配和统一管理。

后端技术框架

采用 BudWk 国产微服务分布式架构,基于 nutzboot + dubbo + nacos + druid 技术体系,核心框架为国产开源框架 nutzboot,采用 Sa-Token权限系统及JWT。根据业务划分微服务模块,如:

  • Sys – 系统及权限模块
  • Cms – 内容及资讯模块
  • Wx – 微信服务模块
  • Dev – 物联网表接入模块
  • Gas – 营收业务模块
  • WebAPI – 后端服务API模块
  • OpenAPI – 第三方服务API模块

前端技术框架

采用 Vite + Vue3 + ElementPlus 常用组合,前后端分离开发模式,封装集成多语言、路由、权限控制、文件上传等功能。

BudWk微服务分布式框架介绍

BudWk(原名NutzWk)发展自2010年,2012年开始用于商业项目,至今已服务于全国各地公司大大小小数千个项目,行业涉及政务、电商、物联网等,随着个人经验积累及从事行业的不同分别发布了1.x至8.x多个版本,您可以根据项目规模选择不同版本。本项目案例众多,省厅级项目、市级平台、大数据项目、电商平台、物联网平台等等。

https://demo.budwk.com V8演示地址

https://nutzwk.wizzer.cn V5演示地址

https://budwk.com 官网及开发指南