万灵药流到所有订户 [英] Elixir stream to all subscribers

查看:84
本文介绍了万灵药流到所有订户的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Elixir中实现无线电服务器

I'm trying to implement a radio server in Elixir

一个进程一直在工作,并且正在读取文件(mp3),并发布到主题:radio,当前用于测试目的的测试结束时重新开始

One process is always working and reading a file (mp3) and publish to topic ":radio", currently for test purpose when it finishes it starts over

每个连接都订阅主题:radio

Each connection subscribes to topic ":radio"

I不了解如何将块发送到所有已订阅的连接,该连接在2或3个块之后关闭

I don't understand how to send the chunks to all subscribed connections, the connection closed after 2 or 3 chunks

defmodule Plugtest do
  import Plug.Conn

  def init(opts), do: opts

  def start() do
    Plug.Adapters.Cowboy.http(Plugtest, [])
    {:ok, _pid} = PubSub.start_link()
    spawn(fn -> stream_from_file("./song.mp3", 128) end)
  end

  def call(conn, _opts) do
    conn = conn
    |> send_chunked(200)
    |> put_resp_content_type("audio/mpeg")

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
#    File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
  end

  defp send_chunk_to_connection(conn) do
    receive do
      {:radio_data, data} ->
        IO.inspect "* #{inspect self()} * [ #{inspect conn.owner} ] [ #{inspect data} ]"
#        Enum.into(data, conn) # not working TODO send chunk to connection
        {:ok, conn} = chunk(conn, data)
        send_chunk_to_connection(conn)
    end
  end

  defp stream_from_file(fpath, bytes) do
    File.stream!(fpath, [], bytes)
    |> Enum.each(fn chunk ->
      PubSub.publish(:radio, {:radio_data, chunk})
    end)
    stream_from_file(fpath, bytes)
  end

end

Stacktrace:

Stacktrace :

[error] Process #PID<0.274.0> raised an exception
        ** (MatchError) no match of right hand side value: {:error, :closed}    
            (plugtest) lib/plugtest.ex:26: Plugtest.send_chunk_to_connection/1

依赖项:

  defp deps do
    [{:plug, "~> 1.0"}, {:cowboy, "~> 1.0"}, {:pubsub, "~> 0.0.2"}]
  end

在@maxdec评论后进行编辑

edit after @maxdec comment

defmodule Plugtest do
  import Plug.Conn

  @file_path "./song.mp3"
  @port 4000
  @chunk_size 128

  def init(opts), do: opts

  def start() do
    Plug.Adapters.Cowboy.http Plugtest, [], port: @port
    {:ok, _pid} = PubSub.start_link()
    spawn fn ->
        stream_from_file(@file_path, @chunk_size)
    end
  end

  def call(conn, _opts) do
    conn = conn
    |> send_chunked(200)
    |> put_resp_content_type("audio/mpeg")

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
#    File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
    conn
  end
  defp send_chunk_to_connection(conn) do
    receive do
      {:radio_data, data} ->
        case chunk(conn, data) do
          {:ok, conn} -> send_chunk_to_connection(conn)
          {:error, err} -> IO.puts err # do nothing, as something went wrong (client disconnection or something else...)
        end
    end
  end

  defp stream_from_file(fpath, bytes) do
    File.stream!(fpath, [], bytes)
    |> Enum.each(fn chunk ->
      PubSub.publish(:radio, {:radio_data, chunk})
    end)
    stream_from_file(fpath, bytes)
  end

end


推荐答案

快速浏览后我认为您应该修复两件事:

After a quick look I think there are 2 things you should fix:


  1. PlugTest 插件,因此通话/ 2 应该返回 conn (不是您的问题)。它也应该在等待事件时阻塞(接收):

  1. PlugTest is a Plug so call/2 should return conn (that's not your issue though). It should also blocks while waiting for events (receive):

def call(conn, _opts) do
  conn = conn
  |> send_chunked(200)
  |> put_resp_content_type("audio/mpeg")

  :ok = PubSub.subscribe(self(), :radio)
  send_chunk_to_connection(conn)
end


  • send_chunk_to_connection 中,您应该这样做:

    defp send_chunk_to_connection(conn) do
      receive do
        {:radio_data, data} ->
          case chunk(conn, data) do
            {:ok, conn} -> send_chunk_to_connection(conn)
            {:error, err} -> IO.puts err; conn # do nothing, as something went wrong (client disconnection or something else...)
          end
      end
    end
    


  • 这篇关于万灵药流到所有订户的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆