如何从破裂的圣灵凤凰那里恢复一个圣灵药剂任务 [英] how to resume an elixir task from where it broke Elixir Phoenix

查看:48
本文介绍了如何从破裂的圣灵凤凰那里恢复一个圣灵药剂任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用 SeaweedFS ,它是用于存储(图像)文件的文件系统,它的工作原理是 rest api.我们正在尝试将数据从一台服务器移动到另一台服务器.

We are using SeaweedFS which is a file system to store (image) files, It works as a rest api. We are trying to move data from one server to another.

有几个级别的数据目录.存储图像的基本模式是

there are a few levels of data directories. The basic pattern through which an image is stored is

http://{server}:8888/ballymore-project-wave/snapshots/recordings/{year}/{month}/{day}/{hour}/00_00_000.jpg

目录的每个级别都有自己的返回值,格式为 JSON ,例如

Each level of the directory has its own return, in form of JSON such as

{
    "Path": "/ballymore-project-wave/snapshots/recordings/",
    "Files": null,
    "Directories": [
        {
            "Name": "2016",
            "Id": 91874
        },
        {
            "Name": "2017",
            "Id": 1538395
        }
    ],
    "Limit": 100,
    "LastFileName": "",
    "ShouldDisplayLoadMore": false
}

以上响应是指当您尝试获取多年录音时,相同的响应是指月,日和小时.当您以一个小时获取一个小时时,

above response is for when you trying to get years for recordings, same responses are for the month, days and an hour. there is a slight change when you fetch single hour as

{
    "Path": "/ballymore-project-wave/snapshots/recordings/2016/11/02/01/",
    "Files": [
        {
            "name": "00_00_000.jpg",
            "fid": "29515,744a5a496b97ff98"
        },
        {
            "name": "00_01_000.jpg",
            "fid": "29514,744a5aa52ea3cf3d"
        }
    ],
    "Directories": null,
    "Limit": 100,
    "LastFileName": "02_15_000.jpg",
    "ShouldDisplayLoadMore": true
}

现在,我们需要将所有这些数据从一台服务器移动到另一台服务器.我为此写了一个脚本

Now we need to move all this data from one server to another. I wrote a script for it as

  defp move_snapshots(exids) do
    exids
    |> Enum.each(fn (exid) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
      |> Enum.sort |> Enum.each(fn (year) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
        |> Enum.sort |> Enum.each(fn (month) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
          |> Enum.sort |> Enum.each(fn (day) ->
            request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
            |> Enum.sort |> Enum.each(fn (hour) ->
              request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
              |> Enum.sort |> Enum.each(fn (file) ->
                exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
                |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
              end)
            end)
          end)
        end)
      end)
    end)
  end

这是主要功能, exids 旨在表示所有摄像机的字符串类型,例如,上面的代码是 ballymore-project-wave .

This is the main function, exids meant to be all cameras string type identification, for above example, it is ballymore-project-wave.

在上面的脚本中,我正在检查每个级别,如果存在某些问题,我将进行更深入的研究,直到最后,我检查其是否为有效图像

In the above script, I am checking each level and if something is present I am going deeper and till last, I check whether its a valid image as

  defp exist_on_seaweed?(url) do
    hackney = [pool: :seaweedfs_download_pool, recv_timeout: 30_000_000]
    case HTTPoison.get("#{@seaweedfs}#{url}", ["Accept": "application/json"], hackney: hackney) do
      {:ok, %HTTPoison.Response{status_code: 200, body: data}} -> {:ok, data}
      _error ->
        :not_found
    end
  end

  defp copy_or_skip(:not_found, _path), do: :noop
  defp copy_or_skip({:ok, data}, path) do
    hackney = [pool: :seaweedfs_upload_pool]
    case HTTPoison.post("#{@seaweedfs_new}#{path}", {:multipart, [{path, data, []}]}, [], hackney: hackney) do
      {:ok, _response} -> Logger.info "[seaweedfs_save]"
      {:error, error} -> Logger.info "[seaweedfs_save] [#{inspect error}]"
    end
  end

这一切都很好,但是当由于某种原因使它崩溃或损坏时,我有一个恢复它的小问题,为此,我需要指导/想法.如您所见,相机 exids 是否为200,并且在100或更少时损坏,它将恢复,但是从一开始,我们就无法移动旧服务器上的内容,直到完全移动为止.帮助将不胜感激.另外,如果您认为代码中可能会有一些改进将很有帮助.

This is all working fine But I have a slight issue of resuming this when it get crashed or broke due to some reason, I need guidance/idea for this. As you can see if camera exids are 200 and it get broke on 100 or maybe less, it will resume but from the very start, we cannot delete things on old server after moving until the full movement, Any help will be appreciated. Also if you think there could be some improvements in the code that would be helpful.

推荐答案

在您发布实际的堆栈跟踪信息或遇到的错误的详细信息之前,不可能确切地找出问题所在.但是对于初学者来说,以下一些建议可能会有所帮助:

Until you post the actual stacktrace or details of the error you are encountering, it's not possible to figure out exactly what's wrong. But for starters, here are some suggestions that might help:

  • 您应该将 move_snapshots 方法分解为更易于理解的方法,也许使用类似

  • You should break down your move_snapshots method into something more understandable, maybe using something like Enum.reduce/3 with recursion and calling your copy_or_skip method as the base case.

尝试将您的 copy_or_skip 方法实现包装在 try/rescue ,抢救所有异常,将其记录下来,然后转到下一个异常.

Try wrapping your copy_or_skip method implementation inside a try/rescue, rescuing any exceptions, logging them and moving on to the next one.

defp copy_or_skip(args, path) do
  # Your Implementation
rescue
  error -> Logger.error("Exception caught on #{inspect(path)}\n#{inspect(error)}")
end

  • 您也可以只浏览所有文件的列表,并将有效路径添加到作业处理库中的某些"Worker",例如 Toniq .磁带库将执行所有移动操作,并将它们标记为成功或失败.然后,您可以返回查看哪些操作失败并找出是什么原因导致的,或者自动重新启动失败的操作.

  • You could also just go through the list of all files, and add the valid paths to some "Worker" in a Job processing library like Que or Toniq. The library will perform all the move operations and mark them successful or failed. You could then go back to see which operations failed and figure out what caused them, or automatically restart the failed ones.

    更多有关提高代码可靠性和性能的提示:

    Some more tips on improving code reliability and performance:

    • 使用 Stream ,或者更好的是, Flow 来划分任务并并行处理它们.
    • 在单独的 Task 流程中执行实际的移动操作,最好由 Supervisor 管理.(可选使用池).

    这篇关于如何从破裂的圣灵凤凰那里恢复一个圣灵药剂任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆