autorenew

Centrifugo 安装与使用

一、安装

使用 docker-compose 安装,指定 config.json 配置文件。

version: "3.9"
services:
  centrifugo:
    container_name: centrifugo
    image: centrifugo/centrifugo:v5
    volumes:
      - ./config.json:/centrifugo/config.json
    command: centrifugo -c config.json
    ports:
      - 8000:8000
    ulimits:
      nofile:
        soft: 65535
        hard: 65535

docker-compose.yml 如上所示,使用 v5 版本。

{
  "token_hmac_secret_key": "kkc_secret",
  "api_key": "kkc_api_key",
  "admin_password": "password",
  "admin_secret": "secret",
  "admin": true,
  "allowed_origins": ["*"],
  "anonymous": true,
  "publish": true,
  "subscribe_to_publish": true,
  "presence": true,
  "debug":true,
  "client_anonymous":true,
  "join_leave": true,
  "allow_subscribe_for_client": true
}

config.json 配置如上所示,我最初写配置时,参考的是官网的 quick start,只有前 5 行,导致 channel 订阅失败,后面才加了下面的,当然还不全,具体参数可去官网查看。

执行启动命令

docker-compose up -d

在浏览器输入 IP + 8000 端口,并输入配置文件中的 admin_password,即可进入主页面,如图所示:

Centrifugo 主页面

二、服务端推送及客户端 token 获取

2.1 maven 依赖

<dependency>  
    <groupId>com.auth0</groupId>  
    <artifactId>java-jwt</artifactId>  
    <version>3.19.2</version>  
</dependency>
<dependency>  
    <groupId>org.springframework.cloud</groupId>  
    <artifactId>spring-cloud-starter-openfeign</artifactId>  
    <version>3.1.1</version>  
</dependency>

引入 java-jwt 依赖,用于生成 token。

引入 spring-cloud-starter-openfeign 依赖,用于 Centrifugo API 调用。

2.2 客户端 token 生成

public static String generateToken(String user) {  
    return JWT.create()  
            .withExpiresAt(new Date(System.currentTimeMillis() + 86400 * 100000))  
            .withClaim("sub", user)  
            .withSubject(user)  
            .sign(Algorithm.HMAC256("kkc-secret"));  
}

上述代码用来生成客户端 token,可自定义过期时间,密钥为 config.json 配置的 token_hmac_secret_key。

2.3 服务端推送

2.3.1 结构体定义
@Data  
@AllArgsConstructor  
@NoArgsConstructor  
@Builder  
public class InstantStructDTO {  
  
    /**  
     * 方法  
     */  
    private String method;  
  
    /**  
     * 参数  
     */  
    private InstantMsgDTO params;  
}
@Data  
@AllArgsConstructor  
@NoArgsConstructor  
@Builder  
public class InstantMsgDTO {  
  
    /**  
     * 通道  
     */  
    private String channel;  
    /**  
     * 通道列表  
     */  
    private List<String> channels;  
    /**  
     * 用户  
     */  
    private String user;  
    /**  
     * 数据  
     */  
    private JSONObject data;  
    /**  
     * 信息  
     */  
    private JSONObject info;  
}
2.3.2 open-feign 调用

统一配置 Header,在 header 中加入 X-API-Key,值为 config.json 中配置的 api_key。

public class InstantMsgConfiguration {  
  
    @Bean  
    public RequestInterceptor MyBasicAuthRequestInterceptor() {  
        return new MyBasicAuthRequestInterceptor();  
    }  
  
    static class MyBasicAuthRequestInterceptor implements RequestInterceptor {  
  
        @Override  
        public void apply(RequestTemplate requestTemplate) {  
            requestTemplate.header("X-API-Key", "kkc_api_key");  
        }  
    }  
}

FallbackFactory 定义。

@Component  
public class InstantMsgServiceFallbackFactory implements FallbackFactory<InstantMsgService> {  
  
    @Override  
    public InstantMsgService create(Throwable cause) {  
        return new InstantMsgService() {  
            @Override  
            public ResponseEntity<Object> sendMsg(InstantStructDTO body) {  
                if (cause instanceof FeignException) {  
                    FeignException feignException = (FeignException) cause;  
                    return ResponseEntity.status(feignException.status()).body(feignException.contentUTF8());  
                } else {  
                    return null;  
                }  
            }  
        };  
    }  
}

service 定义。

@FeignClient(name = "instantMsg-server", url = "http://10.64.4.78:8000", configuration = InstantMsgConfiguration.class, fallbackFactory = InstantMsgServiceFallbackFactory.class)  
public interface InstantMsgService {  
  
    /**  
     * 即时通讯API  
     *     * @param body  
     * @return  
     */    @RequestMapping(value = "/api", method = RequestMethod.POST)  
    ResponseEntity<Object> sendMsg(@RequestBody InstantStructDTO body);  
}

当然,别忘了在启动类上加 @EnableFeignClients 注解。

2.3.3 服务端推送
@Resource  
private InstantMsgService instantMsgService;

private void sendStatus(String serviceCode, Boolean healthStatus) {  
    JSONObject data = new JSONObject();  
    data.put("serviceCode", serviceCode);  
    data.put("healthStatus", healthStatus);  
    InstantMsgDTO instantMsgDTO = InstantMsgDTO.builder()  
            .channel("service-register-health")  
            .data(data)  
            .build();  
    InstantStructDTO instantStructDTO = new InstantStructDTO("publish", instantMsgDTO);  
    ResponseEntity<Object> response = instantMsgService.sendMsg(instantStructDTO);  
    log.debug("instant msg service response:{}", response);  
}

上述代码就是将服务的健康状态使用 Centrifugo 推送给前端。这里 feign 调用的过程就是执行下面的 HTTP 请求。

curl --location --request POST '10.64.4.78:8000/api' \
--header 'X-API-Key: kkc_api_key' \
--header 'Content-Type: application/json' \
--data-raw '{
    "method": "publish",
    "params": {
        "channel": "service-register-health",
        "data": {
            "serviceCode": "kkc-whale",
            "healthStatus": true
        }
    }
}'

前端建立连接,订阅 channel 后会收到如下消息:

前端连接消息

{"connect":{"token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTc0NTA2MTE0MH0.POlYmjLFv2xJOjIzNkuJiy5tkpRNz5xynimcxyXufn4","name":"js"},"id":1}
{"subscribe":{"channel":"service-register-health"},"id":2}
{"id":1,"connect":{"client":"3c950d00-fcc8-4e89-aae6-34066c582791","version":"5.4.9","expires":true,"ttl":8639947,"ping":25,"pong":true}}
{"id":2,"subscribe":{}}

收到推送消息时,如下:

推送消息

{"push":{"channel":"service-register-health","pub":{"data":{"serviceCode":"kkc-whale","healthStatus":false}}}}

三、参考

官网:https://centrifugal.dev/