surroundize

Tool/PipeWire filter to convert stereo audio to surround
git clone https://git.neptards.moe/u3shit/surroundize.git
Log | Files | Refs | README | LICENSE

pipewire.cpp (12684B)


      1 /* PipeWire */
      2 /* SPDX-FileCopyrightText: Copyright © 2019 Wim Taymans */
      3 /* SPDX-License-Identifier: MIT */
      4 
      5 #include <algorithm>
      6 #include <cerrno>
      7 #include <csignal>
      8 #include <cstdint>
      9 #include <cstdio>
     10 #include <cstring>
     11 #include <iostream>
     12 #include <span>
     13 #include <string>
     14 
     15 #include <spa/pod/builder.h>
     16 #include <spa/param/latency-utils.h>
     17 
     18 #include <pipewire/pipewire.h>
     19 #include <pipewire/filter.h>
     20 
     21 #include "decoder.hpp"
     22 #include "file_wrapper.hpp"
     23 #include "options.hpp"
     24 
     25 using namespace std::string_literals;
     26 
     27 static constexpr unsigned INPUTS = 2;
     28 static constexpr const char* INPUT_NAMES[INPUTS] = { "input_FL", "input_FR" };
     29 
     30 namespace
     31 {
     32   struct Data
     33   {
     34     pw_main_loop* loop;
     35     pw_filter* filter;
     36     std::array<void*, INPUTS> in;
     37     std::array<void*, Channel::N_CHANNELS> out;
     38     std::array<void*, Channel::N_CHANNELS> out_by_channel;
     39     std::uint32_t n_outputs = 0, old_latency = -1;
     40     File in_dump, out_dump;
     41 
     42     std::vector<Option> opts;
     43     Decoder dec;
     44     bool force_quantum = true;
     45     bool reset_pending = true;
     46     float preamp = 1;
     47     float old_rate = -1;
     48     std::vector<float> in_buf;
     49     std::uint32_t in_buf_pos = 0;
     50 
     51     std::span<const float> out_buf;
     52 
     53     std::vector<char> cmd_buf;
     54 
     55     void TryFlush(std::uint32_t size);
     56     void Process(struct spa_io_position* position);
     57 
     58     void CreateOutputs();
     59     void ParseArgsFun(std::string_view k, std::string_view v);
     60     void Input(int fd);
     61     void ProcessLine(std::string_view sv) noexcept;
     62   };
     63 }
     64 
     65 template <typename T, std::size_t InN, typename U>
     66 static void Mux(File& dump, std::span<T*, InN> ins, T* out, std::size_t n,
     67                 U mul)
     68 {
     69   auto outp = out;
     70   if (std::all_of(ins.begin(), ins.end(), [](T* t) { return t; }))
     71     // fast path, all inputs exist
     72     for (std::size_t i = 0; i < n; i += ins.size())
     73       for (std::size_t j = 0; j < ins.size(); ++j)
     74         *outp++ = *ins[j]++ * mul;
     75   else
     76     for (std::size_t i = 0; i < n; i += ins.size())
     77       for (std::size_t j = 0; j < ins.size(); ++j)
     78         *outp++ = ins[j] ? *ins[j]++ * mul : 0;
     79 
     80   if (dump) fwrite(out, sizeof(T), n, dump);
     81 }
     82 
     83 template <typename T, std::size_t OutN>
     84 static void Demux(
     85   File& dump, const T* inp, std::span<T*, OutN> outs, std::size_t n)
     86 {
     87   if (dump) fwrite(inp, sizeof(float), n, dump);
     88 
     89   if (std::all_of(outs.begin(), outs.end(), [](T* t) { return t; }))
     90     // fast path, all outputs exist
     91     for (std::size_t i = 0; i < n; i += outs.size())
     92       for (std::size_t j = 0; j < outs.size(); ++j)
     93         *outs[j]++ = *inp++;
     94   else
     95     for (std::size_t i = 0; i < n; i += outs.size())
     96       for (std::size_t j = 0; j < outs.size(); ++j)
     97       {
     98         if (outs[j]) *outs[j]++ = *inp;
     99         ++inp;
    100       }
    101 }
    102 
    103 template <std::size_t N>
    104 static std::pair<bool, std::array<float*, N>> GetBufs(
    105   std::array<void*, N> ports, std::uint32_t n, std::uint32_t n_samples)
    106 {
    107   std::array<float*, N> res;
    108   bool has = false;
    109   for (std::size_t i = 0; i < n; ++i)
    110   {
    111     res[i] = static_cast<float*>(pw_filter_get_dsp_buffer(ports[i], n_samples));
    112     if (res[i]) has = true;
    113   }
    114   return {has, res};
    115 }
    116 
    117 void Data::Process(struct spa_io_position* position)
    118 {
    119   std::uint32_t n_samples = position->clock.duration;
    120   auto rate = float(position->clock.rate.num) / float(position->clock.rate.denom);
    121   std::uint32_t block_size = dec.GetBlockSize();
    122   std::uint32_t buf_size = INPUTS * (block_size ? block_size : n_samples);
    123   pw_log_trace("do process %" PRIu32, n_samples);
    124 
    125   auto latency = dec.GetDelay();
    126   if (n_samples < block_size) latency += block_size - n_samples;
    127   if (latency != old_latency)
    128   {
    129     uint8_t buffer[1024];
    130     struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
    131     auto li = SPA_PROCESS_LATENCY_INFO_INIT(.rate = int(latency));
    132     const struct spa_pod* params[1] = {
    133       spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &li),
    134     };
    135     pw_filter_update_params(filter, nullptr, params, 1);
    136     old_latency = latency;
    137   }
    138 
    139   auto [has_in, ins] = GetBufs(in, INPUTS, n_samples);
    140   auto [has_out, outs] = GetBufs(out, n_outputs, n_samples);
    141 
    142   if (!has_in && !has_out)
    143   {
    144     pw_log_trace("Nothing to do");
    145     reset_pending = true;
    146     return;
    147   }
    148   if (!has_in && out_buf.empty())
    149   {
    150     pw_log_trace("Generating silence");
    151     for (unsigned i = 0; i < n_outputs; ++i)
    152       if (outs[i]) std::memset(outs[i], 0, sizeof(float) * n_samples);
    153     return;
    154   }
    155 
    156   if (reset_pending || rate != old_rate)
    157   {
    158     dec.Init(rate);
    159     old_rate = rate;
    160     in_buf_pos = 0;
    161     out_buf = {};
    162     reset_pending = false;
    163   }
    164 
    165   in_buf.resize(buf_size);
    166   if (in_buf_pos > buf_size) in_buf_pos = 0;
    167   auto to_read = INPUTS * n_samples, to_write = n_outputs * n_samples;
    168 
    169   auto write_out = [&]()
    170   {
    171     auto now_write = std::min(to_write, std::uint32_t(out_buf.size()));
    172     if (!now_write) return;
    173     Demux(out_dump, out_buf.data(), std::span{outs}.subspan(0, n_outputs),
    174           now_write);
    175     out_buf = out_buf.subspan(now_write);
    176     to_write -= now_write;
    177   };
    178 
    179   write_out();
    180   while (to_read)
    181   {
    182     auto now_read = std::min<std::uint32_t>(to_read, buf_size - in_buf_pos);
    183     Mux(in_dump, std::span{ins}, in_buf.data() + in_buf_pos, now_read, preamp);
    184     to_read -= now_read, in_buf_pos += now_read;
    185 
    186     if (in_buf_pos == buf_size)
    187     {
    188       if (!out_buf.empty()) pw_log_warn("Discarding non-empty buffer!");
    189       pw_log_trace("Calling decode");
    190       out_buf = dec.Decode(in_buf);
    191       in_buf_pos = 0;
    192     }
    193 
    194     write_out();
    195   }
    196 
    197   if (to_write)
    198   {
    199     to_write /= n_outputs;
    200     pw_log_debug("Memsetting remaining %" PRIu32 " samples", to_write);
    201     for (unsigned i = 0; i < n_outputs; ++i)
    202       if (outs[i]) std::memset(outs[i], 0, sizeof(float) * to_write);
    203   }
    204 }
    205 
    206 void Data::Input(int fd)
    207 {
    208   constexpr std::size_t READ_SIZE = 4096;
    209   auto off = cmd_buf.size();
    210   cmd_buf.resize(off + READ_SIZE);
    211   // noexcept from here
    212   auto rd = read(fd, cmd_buf.data() + off, READ_SIZE);
    213   if (rd < 0) { cmd_buf.resize(off); return; }
    214   cmd_buf.resize(off + rd);
    215 
    216   auto st = cmd_buf.begin();
    217   while (st < cmd_buf.end())
    218     if (auto end = std::find(st, cmd_buf.end(), '\n'); end != cmd_buf.end())
    219     {
    220       ProcessLine({st, end});
    221       st = end+1;
    222     }
    223     else if (rd == 0)
    224     {
    225       ProcessLine({st, cmd_buf.end()});
    226       st = cmd_buf.end();
    227     }
    228   cmd_buf.erase(cmd_buf.begin(), st);
    229 }
    230 
    231 void Data::ParseArgsFun(std::string_view k, std::string_view v)
    232 {
    233   if (HandleOption(opts, k, v)) return;
    234   auto dopt = dec.GetOptions();
    235   if (HandleOption(dopt, k, v)) return;
    236   throw ParseError("Unknown option "s + std::string{k});
    237 }
    238 
    239 void Data::ProcessLine(std::string_view sv) noexcept
    240 {
    241   try
    242   {
    243     std::vector<std::string_view> parts;
    244     for (std::size_t st = 0; st < sv.size(); )
    245       if (sv[st] == ' ' || sv[st] == '\t') ++st;
    246       else if (auto p = sv.find_first_of(" \t", st); p != std::string_view::npos)
    247       {
    248         parts.push_back(sv.substr(st, p-st));
    249         st = p+1;
    250       }
    251       else
    252       {
    253         parts.push_back(sv.substr(st));
    254         break;
    255       }
    256 
    257     ParseOptions(parts, std::bind_front(&Data::ParseArgsFun, this));
    258 
    259     out_buf = {}; // FIXME: better way
    260     // update quantum
    261     {
    262       auto quantum = std::to_string(force_quantum ? dec.GetBlockSize() : 0);
    263       spa_dict_item upd[] = {
    264         SPA_DICT_ITEM_INIT(PW_KEY_NODE_FORCE_QUANTUM, quantum.c_str()),
    265       };
    266       auto ud = SPA_DICT_INIT(upd, std::size(upd));
    267       pw_filter_update_properties(filter, nullptr, &ud);
    268     }
    269 
    270     CreateOutputs();
    271   }
    272   catch (Exit) {} // ignore Exit from help
    273   catch (const std::exception& e)
    274   { std::cerr << "Failed to execute command: " << e.what() << std::endl; }
    275 }
    276 
    277 void Data::CreateOutputs()
    278 {
    279   auto outputs = dec.GetChannels();
    280   n_outputs = outputs.size();
    281 
    282   std::array<bool, Channel::N_CHANNELS> used{};
    283   for (std::uint32_t i = 0; i < n_outputs; ++i)
    284   {
    285     auto ch = outputs[i];
    286     used[ch] = true;
    287     if (!out_by_channel[ch])
    288     {
    289       auto pw_name = Channel::NAMES_PIPEWIRE[ch];
    290       out_by_channel[ch] = pw_filter_add_port(
    291         filter,
    292         PW_DIRECTION_OUTPUT,
    293         PW_FILTER_PORT_FLAG_MAP_BUFFERS,
    294         0, // sizeof(Port)
    295         pw_properties_new(
    296           PW_KEY_FORMAT_DSP, "32 bit float mono audio",
    297           PW_KEY_AUDIO_CHANNEL, pw_name,
    298           PW_KEY_PORT_NAME, ("output_"s + pw_name).c_str(),
    299           nullptr),
    300         nullptr, 0);
    301     }
    302     out[i] = out_by_channel[ch];
    303   }
    304 
    305   for (unsigned ch = 0; ch < Channel::N_CHANNELS; ++ch)
    306     if (!used[ch] && out_by_channel[ch])
    307     {
    308       pw_filter_remove_port(out_by_channel[ch]);
    309       out_by_channel[ch] = nullptr;
    310     }
    311 }
    312 
    313 static const struct pw_filter_events filter_events =
    314 {
    315   .version = PW_VERSION_FILTER_EVENTS,
    316   .process = [](void* userdata, struct spa_io_position* position)
    317   { reinterpret_cast<Data*>(userdata)->Process(position); },
    318 };
    319 
    320 static void DoQuit(void *userdata, int signal_number)
    321 {
    322   (void) signal_number;
    323   pw_main_loop_quit(reinterpret_cast<Data*>(userdata)->loop);
    324 }
    325 
    326 static File OpenFile(std::string_view sv)
    327 {
    328   if (sv.empty()) return {};
    329   else return {std::string{sv}.c_str(), "wb"};
    330 }
    331 
    332 int Main(int argc, char *argv[])
    333 {
    334   Data data = {};
    335   data.opts = {
    336     {
    337       "help", "", "Show help",
    338       [&](std::string_view)
    339       {
    340         std::cerr << "Usage " << argv[0] << " [--options]\n";
    341         PrintHelp(data.opts);
    342         data.dec.PrintHelp();
    343         throw Exit{0};
    344       },
    345     },
    346     {
    347       "dump_input", "FILENAME", "Dump input to file",
    348       [&data](std::string_view sv) { data.in_dump = OpenFile(sv); },
    349     },
    350     {
    351       "dump_output", "FILENAME", "Dump output to file",
    352       [&data](std::string_view sv) { data.out_dump = OpenFile(sv); },
    353     },
    354     {
    355       "force_quantum", "BOOL",
    356       "Force Pipewire to use the decoder's block size as quantum",
    357       [&data](std::string_view sv) { data.force_quantum = FromString<bool>(sv); },
    358     },
    359     {
    360       "preamp", "FLOAT",
    361       "Multiply all input values by this (default: 1)",
    362       [&data](std::string_view sv) { data.preamp = FromString<float>(sv); },
    363     },
    364   };
    365 
    366   ParseArgs(argc, argv, std::bind_front(&Data::ParseArgsFun, &data));
    367 
    368   data.opts.emplace_back(
    369     "reset", "", "Reset the current decoder",
    370     [&](auto) { data.reset_pending = true; });
    371   data.opts.emplace_back(
    372     "quit", "", "Quits the application",
    373     [&](auto) { pw_main_loop_quit(data.loop); });
    374 
    375   pw_init(&argc, &argv);
    376 
    377   /* make a main loop. If you already have another main loop, you can add
    378    * the fd of this pipewire mainloop to it. */
    379   data.loop = pw_main_loop_new(nullptr);
    380   auto loop = pw_main_loop_get_loop(data.loop);
    381 
    382   pw_loop_add_signal(loop, SIGINT, DoQuit, &data);
    383   pw_loop_add_signal(loop, SIGTERM, DoQuit, &data);
    384   pw_loop_add_io(
    385     loop, STDIN_FILENO, SPA_IO_IN, false,
    386     [](void* userdata, int fd, uint32_t mask)
    387     {
    388       (void) mask;
    389       reinterpret_cast<Data*>(userdata)->Input(fd);
    390     }, &data);
    391 
    392   /* Create a simple filter, the simple filter manages the core and remote
    393    * objects for you if you don't need to deal with them.
    394    *
    395    * Pass your events and a user_data pointer as the last arguments. This
    396    * will inform you about the filter state. The most important event
    397    * you need to listen to is the process event where you need to process
    398    * the data.
    399    */
    400   data.filter = pw_filter_new_simple(
    401     loop,
    402     "surroundize-pipewire",
    403     pw_properties_new(
    404       PW_KEY_MEDIA_TYPE, "Audio",
    405       PW_KEY_MEDIA_CATEGORY, "Filter",
    406       PW_KEY_MEDIA_ROLE, "DSP",
    407       PW_KEY_MEDIA_CLASS, "Stream/Output/Audio",
    408       PW_KEY_APP_NAME, "surroundize-pipewire",
    409       PW_KEY_NODE_FORCE_QUANTUM,
    410       std::to_string(data.force_quantum ? data.dec.GetBlockSize() : 0).c_str(),
    411       PW_KEY_NODE_PASSIVE, "true",
    412       nullptr),
    413     &filter_events,
    414     &data);
    415 
    416   for (unsigned i = 0; i < INPUTS; ++i)
    417     data.in[i] = pw_filter_add_port(
    418       data.filter,
    419       PW_DIRECTION_INPUT,
    420       PW_FILTER_PORT_FLAG_MAP_BUFFERS,
    421       0, // sizeof(Port)
    422       pw_properties_new(
    423         PW_KEY_FORMAT_DSP, "32 bit float mono audio",
    424         PW_KEY_AUDIO_CHANNEL, INPUT_NAMES[i] + 6,
    425         PW_KEY_PORT_NAME, INPUT_NAMES[i],
    426         nullptr),
    427       nullptr, 0);
    428 
    429   data.CreateOutputs();
    430 
    431   /* Now connect this filter. We ask that our process function is
    432    * called in a realtime thread. */
    433   if (pw_filter_connect(data.filter,
    434                         PW_FILTER_FLAG_NONE, //PW_FILTER_FLAG_RT_PROCESS,
    435                         nullptr, 0) < 0)
    436   {
    437     std::cerr << "can't connect\n";
    438     return -1;
    439   }
    440 
    441   /* and wait while we let things run */
    442   pw_main_loop_run(data.loop);
    443 
    444   pw_filter_destroy(data.filter);
    445   pw_main_loop_destroy(data.loop);
    446   pw_deinit();
    447 
    448   return 0;
    449 }