const { PassThrough: ThroughStream } = require('stream');
// eslint-ignore-next-line node/no-extraneous-require
const duplexify = require('duplexify').obj;

module.exports = {
  streamFlatMap,
  asyncGeneratorToStream,
};

// returns an async generator that maps each chunk to a stream with the specified
// "entryToStream" mapping fn, and forwards child streams out
// useable with streams.pipeline
function streamFlatMap(entryToStream) {
  const duplex = asyncGeneratorToStream(flatMapGenerator);
  return duplex;

  async function* flatMapGenerator(source) {
    for await (const entry of source) {
      const subStream = entryToStream(entry);
      yield* subStream;
    }
  }
}

// this stupid utility turns an async iterator factory into a duplex stream
function asyncGeneratorToStream(factoryFn) {
  const writableStream = new ThroughStream({ objectMode: true });
  const readableStream = new ThroughStream({ objectMode: true });
  const duplex = duplexify(writableStream, readableStream);
  const asyncIter = factoryFn(writableStream);
  // drain iterator into readable stream
  process.nextTick(async () => {
    try {
      for await (const item of asyncIter) {
        readableStream.write(item);
      }
      readableStream.end();
    } catch (err) {
      readableStream.destroy(err);
    }
  });
  return duplex;
}