万灵药流到所有订户 [英] Elixir stream to all subscribers
问题描述
我正在尝试在Elixir中实现无线电服务器
I'm trying to implement a radio server in Elixir
一个进程一直在工作,并且正在读取文件(mp3),并发布到主题:radio,当前用于测试目的的测试结束时重新开始
One process is always working and reading a file (mp3) and publish to topic ":radio", currently for test purpose when it finishes it starts over
每个连接都订阅主题:radio
Each connection subscribes to topic ":radio"
I不了解如何将块发送到所有已订阅的连接,该连接在2或3个块之后关闭
I don't understand how to send the chunks to all subscribed connections, the connection closed after 2 or 3 chunks
defmodule Plugtest do
import Plug.Conn
def init(opts), do: opts
def start() do
Plug.Adapters.Cowboy.http(Plugtest, [])
{:ok, _pid} = PubSub.start_link()
spawn(fn -> stream_from_file("./song.mp3", 128) end)
end
def call(conn, _opts) do
conn = conn
|> send_chunked(200)
|> put_resp_content_type("audio/mpeg")
:ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
# File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
end
defp send_chunk_to_connection(conn) do
receive do
{:radio_data, data} ->
IO.inspect "* #{inspect self()} * [ #{inspect conn.owner} ] [ #{inspect data} ]"
# Enum.into(data, conn) # not working TODO send chunk to connection
{:ok, conn} = chunk(conn, data)
send_chunk_to_connection(conn)
end
end
defp stream_from_file(fpath, bytes) do
File.stream!(fpath, [], bytes)
|> Enum.each(fn chunk ->
PubSub.publish(:radio, {:radio_data, chunk})
end)
stream_from_file(fpath, bytes)
end
end
Stacktrace:
Stacktrace :
[error] Process #PID<0.274.0> raised an exception
** (MatchError) no match of right hand side value: {:error, :closed}
(plugtest) lib/plugtest.ex:26: Plugtest.send_chunk_to_connection/1
依赖项:
defp deps do
[{:plug, "~> 1.0"}, {:cowboy, "~> 1.0"}, {:pubsub, "~> 0.0.2"}]
end
在@maxdec评论后进行编辑
edit after @maxdec comment
defmodule Plugtest do
import Plug.Conn
@file_path "./song.mp3"
@port 4000
@chunk_size 128
def init(opts), do: opts
def start() do
Plug.Adapters.Cowboy.http Plugtest, [], port: @port
{:ok, _pid} = PubSub.start_link()
spawn fn ->
stream_from_file(@file_path, @chunk_size)
end
end
def call(conn, _opts) do
conn = conn
|> send_chunked(200)
|> put_resp_content_type("audio/mpeg")
:ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio)
# File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only
conn
end
defp send_chunk_to_connection(conn) do
receive do
{:radio_data, data} ->
case chunk(conn, data) do
{:ok, conn} -> send_chunk_to_connection(conn)
{:error, err} -> IO.puts err # do nothing, as something went wrong (client disconnection or something else...)
end
end
end
defp stream_from_file(fpath, bytes) do
File.stream!(fpath, [], bytes)
|> Enum.each(fn chunk ->
PubSub.publish(:radio, {:radio_data, chunk})
end)
stream_from_file(fpath, bytes)
end
end
推荐答案
快速浏览后我认为您应该修复两件事:
After a quick look I think there are 2 things you should fix:
-
PlugTest
是插件
,因此通话/ 2
应该返回conn
(不是您的问题)。它也应该在等待事件时阻塞(接收
):
PlugTest
is aPlug
socall/2
should returnconn
(that's not your issue though). It should also blocks while waiting for events (receive
):
def call(conn, _opts) do
conn = conn
|> send_chunked(200)
|> put_resp_content_type("audio/mpeg")
:ok = PubSub.subscribe(self(), :radio)
send_chunk_to_connection(conn)
end
在 send_chunk_to_connection
中,您应该这样做:
defp send_chunk_to_connection(conn) do
receive do
{:radio_data, data} ->
case chunk(conn, data) do
{:ok, conn} -> send_chunk_to_connection(conn)
{:error, err} -> IO.puts err; conn # do nothing, as something went wrong (client disconnection or something else...)
end
end
end
这篇关于万灵药流到所有订户的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!