溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring boot項目redisTemplate實現輕量級消息隊列的方法

發布時間:2020-09-15 03:50:11 來源:腳本之家 閱讀:224 作者:wangzaiplus 欄目:編程語言

背景

公司項目有個需求, 前端上傳excel文件, 后端讀取數據、處理數據、返回錯誤數據, 最簡單的方式同步處理, 客戶端上傳文件后一直阻塞等待響應, 但用戶體驗無疑很差, 處理數據可能十分耗時, 沒人愿意傻等, 由于項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop很適合作為一種輕量級的消息隊列實現, 所以用它完成此次功能開發

一、本文涉及知識點

  • excel文件讀寫--阿里easyexcel sdk
  • 文件上傳、下載--騰訊云對象存儲
  • 遠程服務調用--restTemplate
  • 生產者、消費者--redisTemplate leftPush和rightPop操作
  • 異步處理數據--Executors線程池
  • 讀取網絡文件流--HttpClient
  • 自定義注解實現用戶身份認證--JWT token認證, 攔截器攔截標注有@LoginRequired注解的請求入口

當然, Java實現咯

涉及的知識點比較多, 每一個知識點都可以作為專題進行學習分析, 本文將完整實現呈現出來, 后期拆分與小伙伴分享學習

二、項目目錄結構

Spring boot項目redisTemplate實現輕量級消息隊列的方法

說明: 數據庫DAO層放到另一個模塊了, 不是本文重點

三、主要maven依賴

1、easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>easyexcel</artifactId>
   <version>${easyexcel-latestVersion}</version>
  </dependency>

JWT

  <dependency>
   <groupId>io.jsonwebtoken</groupId>
   <artifactId>jjwt</artifactId>
   <version>0.7.0</version>
  </dependency>

redis

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-redis</artifactId>
   <version>1.3.5.RELEASE</version>
  </dependency>

騰訊cos

  <dependency>
   <groupId>com.qcloud</groupId>
   <artifactId>cos_api</artifactId>
   <version>5.4.5</version>
  </dependency>

四、流程

  1. 用戶上傳文件
  2. 將文件存儲到騰訊cos
  3. 將上傳后的文件id及上傳記錄保存到數據庫
  4. redis生產一條導入消息, 即保存文件id到redis
  5. 請求結束, 返回"處理中"狀態
  6. redis消費消息
  7. 讀取cos文件, 異步處理數據
  8. 將錯誤數據以excel形式上傳至cos, 以供用戶下載, 并更新處理狀態為"處理完成"
  9. 客戶端輪詢查詢處理狀態, 并可以下載錯誤文件
  10. 結束

五、實現效果

上傳文件

Spring boot項目redisTemplate實現輕量級消息隊列的方法

數據庫導入記錄

Spring boot項目redisTemplate實現輕量級消息隊列的方法

導入的數據

Spring boot項目redisTemplate實現輕量級消息隊列的方法

下載錯誤文件

Spring boot項目redisTemplate實現輕量級消息隊列的方法

錯誤數據提示

Spring boot項目redisTemplate實現輕量級消息隊列的方法

查詢導入記錄

Spring boot項目redisTemplate實現輕量級消息隊列的方法

六、代碼實現

1、導入excel控制層

  @LoginRequired
  @RequestMapping(value = "doImport", method = RequestMethod.POST)
  public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
    PLUser user = getUser(request);
    return orderImportService.doImport(file, user.getId());
  }

2、service層

  @Override
  public JsonResponse doImport(MultipartFile file, Integer userId) {
    if (null == file || file.isEmpty()) {
      throw new ServiceException("文件不能為空");
    }

    String filename = file.getOriginalFilename();
    if (!checkFileSuffix(filename)) {
      throw new ServiceException("當前僅支持xlsx格式的excel");
    }

    // 存儲文件
    String fileId = saveToOss(file);
    if (StringUtils.isBlank(fileId)) {
      throw new ServiceException("文件上傳失敗, 請稍后重試");
    }

    // 保存記錄到數據庫
    saveRecordToDB(userId, fileId, filename);

    // 生產一條訂單導入消息
    redisProducer.produce(RedisKey.orderImportKey, fileId);

    return JsonResponse.ok("導入成功, 處理中...");
  }

  /**
   * 校驗文件格式
   * @param fileName
   * @return
   */
  private static boolean checkFileSuffix(String fileName) {
    if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
      return false;
    }

    int pointIndex = fileName.lastIndexOf(".");
    String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
    if (".xlsx".equals(suffix)) {
      return true;
    }

    return false;
  }

  /**
   * 將文件存儲到騰訊OSS
   * @param file
   * @return
   */
  private String saveToOss(MultipartFile file) {
    InputStream ins = null;
    try {
      ins = file.getInputStream();
    } catch (IOException e) {
      e.printStackTrace();
    }

    String fileId;
    try {
      String originalFilename = file.getOriginalFilename();
      File f = new File(originalFilename);
      inputStreamToFile(ins, f);
      FileSystemResource resource = new FileSystemResource(f);

      MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }

    return fileId;
  }

3、redis生產者

@Service
public class RedisProducerImpl implements RedisProducer {

  @Autowired
  private RedisTemplate redisTemplate;

  @Override
  public JsonResponse produce(String key, String msg) {
    Map<String, String> map = Maps.newHashMap();
    map.put("fileId", msg);
    redisTemplate.opsForList().leftPush(key, map);
    return JsonResponse.ok();
  }

}

4、redis消費者

@Service
public class RedisConsumer {

  @Autowired
  public RedisTemplate redisTemplate;

  @Value("${txOssFileUrl}")
  private String txOssFileUrl;

  @Value("${txOssUploadUrl}")
  private String txOssUploadUrl;

  @PostConstruct
  public void init() {
    processOrderImport();
  }

  /**
   * 處理訂單導入
   */
  private void processOrderImport() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
      while (true) {
        Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
        if (null == object) {
          continue;
        }
        String msg = JSON.toJSONString(object);
        executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
      }
    });
  }

}

5、處理任務線程類

public class OrderImportTask implements Runnable {
  public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
    this.msg = msg;
    this.txOssFileUrl = txOssFileUrl;
    this.txOssUploadUrl = txOssUploadUrl;
  }
}

  /**
   * 注入bean
   */
  private void autowireBean() {
    this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
    this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
    this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
  }

  @Override
  public void run() {
    // 注入bean
    autowireBean();

    JSONObject jsonObject = JSON.parseObject(msg);
    String fileId = jsonObject.getString("fileId");

    MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
    param.add("id", fileId);

    ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
    String fileUrl = (String) responseResult.getData();
    if (StringUtils.isBlank(fileUrl)) {
      return;
    }

    InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
    List<Object> list = ExcelUtil.read(inputStream);
    process(list, fileId);
  }

  /**
   * 將文件上傳至oss
   * @param file
   * @return
   */
  private String saveToOss(File file) {
    String fileId;
    try {
      FileSystemResource resource = new FileSystemResource(file);
      MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }
    return fileId;
  }

說明: 處理數據的業務邏輯代碼就不用貼了

6、上傳文件到cos

  @RequestMapping("/txOssUpload")
  @ResponseBody
  public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
    if (null == file || file.isEmpty()) {
      return ResponseResult.fail("文件不能為空");
    }

    String originalFilename = file.getOriginalFilename();
    originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題
    String contentType = getContentType(originalFilename);
    String key;

    InputStream ins = null;
    File f = null;

    try {
      ins = file.getInputStream();
      f = new File(originalFilename);
      inputStreamToFile(ins, f);
      key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
    } catch (Exception e) {
      return ResponseResult.fail(e.getMessage());
    } finally {
      if (null != ins) {
        try {
          ins.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (f.exists()) {// 刪除臨時文件
        f.delete();
      }
    }

    return ResponseResult.ok(key);
  }

  public static void inputStreamToFile(InputStream ins,File file) {
    try {
      OutputStream os = new FileOutputStream(file);
      int bytesRead = 0;
      byte[] buffer = new byte[8192];
      while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
        os.write(buffer, 0, bytesRead);
      }
      os.close();
      ins.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
    key = Uuid.getUuid() + "-" + key;
    OSSUtil.txOssUpload(inputStream, key, contentType);
    try {
      if (null != inputStream) {
        inputStream.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
    return key;
  }

  public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
    ObjectMetadata objectMetadata = new ObjectMetadata();
    try{
      int length = inputStream.available();
      objectMetadata.setContentLength(length);
    }catch (Exception e){
      logger.info(e.getMessage());
    }
    objectMetadata.setContentType(contentType);
    cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
  }

7、下載文件

  /**
   * 騰訊云文件下載
   * @param response
   * @param id
   * @return
   */
  @RequestMapping("/txOssDownload")
  public Object txOssDownload(HttpServletResponse response, String id) {
    COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
    String contentType = getContentType(id);
    FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
    return null;
  }

  public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
    FileOutputStream fos = null;
    response.reset();
    OutputStream os = null;
    try {
      response.setContentType(contentType + "; charset=utf-8");
      if(!contentType.equals(PlConstans.FileContentType.image)){
        try {
          response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
        } catch (UnsupportedEncodingException e) {
          response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
          logger.error("encoding file name failed", e);
        }
      }

      os = response.getOutputStream();

      byte[] b = new byte[1024 * 1024];
      int len;
      while ((len = fileStream.read(b)) > 0) {
        os.write(b, 0, len);
        os.flush();
        try {
          if(fos != null) {
            fos.write(b, 0, len);
            fos.flush();
          }
        } catch (Exception e) {
          logger.error(e.getMessage());
        }
      }
    } catch (IOException e) {
      IOUtils.closeQuietly(fos);
      fos = null;
    } finally {
      IOUtils.closeQuietly(os);
      IOUtils.closeQuietly(fileStream);
      if(fos != null) {
        IOUtils.closeQuietly(fos);
      }
    }
  }

8、讀取網絡文件流

  /**
   * 讀取網絡文件流
   * @param url
   * @return
   */
  public static InputStream readFileFromURL(String url) {
    if (StringUtils.isBlank(url)) {
      return null;
    }

    HttpClient httpClient = new DefaultHttpClient();
    HttpGet methodGet = new HttpGet(url);
    try {
      HttpResponse response = httpClient.execute(methodGet);
      if (response.getStatusLine().getStatusCode() == 200) {
        HttpEntity entity = response.getEntity();
        return entity.getContent();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }

9、ExcelUtil

  /**
   * 讀excel
   * @param inputStream 文件輸入流
   * @return list集合
   */
  public static List<Object> read(InputStream inputStream) {
    return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
  }

  /**
   * 寫excel
   * @param data list數據
   * @param clazz
   * @param saveFilePath 文件保存路徑
   * @throws IOException
   */
  public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
    File tempFile = new File(saveFilePath);
    OutputStream out = new FileOutputStream(tempFile);
    ExcelWriter writer = EasyExcelFactory.getWriter(out);
    Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
    writer.write(data, sheet);
    writer.finish();
    out.close();
  }

說明: 至此, 整個流程算是完整了, 下面將其他知識點代碼也貼出來參考

七、其他

1、@LoginRequired注解

/**
 * 在需要登錄驗證的Controller的方法上使用此注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

  @ResponseBody
  @ExceptionHandler(TokenValidationException.class)
  public JsonResponse tokenValidationExceptionHandler() {
    return JsonResponse.loginInvalid();
  }

  @ResponseBody
  @ExceptionHandler(ServiceException.class)
  public JsonResponse serviceExceptionHandler(ServiceException se) {
    return JsonResponse.fail(se.getMsg());
  }

  @ResponseBody
  @ExceptionHandler(Exception.class)
  public JsonResponse exceptionHandler(Exception e) {
    e.printStackTrace();
    return JsonResponse.fail(e.getMessage());
  }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

  private static final String CURRENT_USER = "user";

  @Autowired
  private UserService userService;

  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    // 如果不是映射到方法直接通過
    if (!(handler instanceof HandlerMethod)) {
      return true;
    }
    HandlerMethod handlerMethod = (HandlerMethod) handler;
    Method method = handlerMethod.getMethod();

    // 判斷接口是否有@LoginRequired注解, 有則需要登錄
    LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
    if (methodAnnotation != null) {
      // 驗證token
      Integer userId = JwtUtil.verifyToken(request);
      PLUser plUser = userService.selectByPrimaryKey(userId);
      if (null == plUser) {
        throw new RuntimeException("用戶不存在,請重新登錄");
      }
      request.setAttribute(CURRENT_USER, plUser);
      return true;
    }
    return true;
  }

  @Override
  public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
  }

  @Override
  public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
  }
}

4、JwtUtil

  public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
  public static final String SECRET = "pl_token_secret";
  public static final String HEADER = "token";
  public static final String USER_ID = "userId";

  /**
   * 根據userId生成token
   * @param userId
   * @return
   */
  public static String generateToken(String userId) {
    HashMap<String, Object> map = new HashMap<>();
    map.put(USER_ID, userId);
    String jwt = Jwts.builder()
        .setClaims(map)
        .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
        .signWith(SignatureAlgorithm.HS512, SECRET)
        .compact();
    return jwt;
  }

  /**
   * 驗證token
   * @param request
   * @return 驗證通過返回userId
   */
  public static Integer verifyToken(HttpServletRequest request) {
    String token = request.getHeader(HEADER);
    if (token != null) {
      try {
        Map<String, Object> body = Jwts.parser()
            .setSigningKey(SECRET)
            .parseClaimsJws(token)
            .getBody();

        for (Map.Entry entry : body.entrySet()) {
          Object key = entry.getKey();
          Object value = entry.getValue();
          if (key.toString().equals(USER_ID)) {
            return Integer.valueOf(value.toString());// userId
          }
        }
        return null;
      } catch (Exception e) {
        logger.error(e.getMessage());
        throw new TokenValidationException("unauthorized");
      }
    } else {
      throw new TokenValidationException("missing token");
    }
  }

結語: OK, 搞定,睡了, 好困

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女