import { removeOne } from "../collections/array"
import { sleep } from "../promise"
import { TimestampInMilliseconds } from "../time"

require( '../../utils/extended-prototypes' )

export type Output<T> = ( ...values: T[] ) => void

export interface iConsumer<T> {
  push<U extends T>( ...events: U[] ): void
  attachSource<U extends T>( source: Producer<U> ): void
}

export interface Producer<T> {
  addOutput( output: Output<T> ): void
  removeOutput( output: Output<T> ): void
}

export interface iSpliter<T> {
  on<U extends T>( channel: string, output: Output<U> ): void
}

export function isSplitter<T,U>( x: T | iSpliter<U> ): x is T & iSpliter<U> {
  return ( x as iSpliter<U> ).on != null
}

export function isMixer<T,U>( x: T | Mixer<U> ): x is T & Mixer<U> {
  return ( x as Mixer<U> ).enableChannel != null
}


type Tagger<T> = ( v:T ) => string

export class Splitter<T> implements iConsumer<T>, iSpliter<T> {
  private readonly outputsByTag: Record<string, Output<T>[]>

  constructor( tagger: Tagger<T> ) {
    this.outputsByTag = {}
    const outputsByTag = this.outputsByTag
    this.push = <U extends T>( ...events: U[] ) => {
      for ( const event of events ) {
        const outputs = outputsByTag[  tagger( event ) ]
        if ( outputs != null ) {
          for ( const output of outputs ) {
            process.nextTick( output, event )
          }
        }
      }
    }
  }
 
  attachSource<U extends Producer<T>>( source: U ): void {
    source.addOutput( this.push )
  }

  readonly push: <U extends T>( ...events: U[] ) => void

  on<U extends T>( type: string, output: Output<U> ): this {
    const outputs = ( this.outputsByTag[type] ??= [] )
    /* 
      casting is used because TS have problem 
      with correct recognize type U which extends T
    */
    outputs.push( output as Output<T> )
    return this
  }

  pipeTo<U extends Mixer<T>>( mixer: U ): U {
    mixer.attachSource( this )
    return mixer
  }
}


export class Passthrough<T> implements iConsumer<T>, Producer<T> {
  protected readonly outputs: Output<T>[]
  name: string

  constructor( name = '[Passthrough]' ) {
    this.name = name
    this.outputs = []
  }

  attachSource<U extends T>( source: Producer<U> ): void {
    source.addOutput( this.push )
  }

  public push = <U extends T>( ...values: U[] ): void => {
    for ( const output of this.outputs ) {
      process.nextTick( output, ...values )
    }
  }

  addOutput( output: Output<T> ): void {
    this.outputs.push( output )
  }

  removeOutput( output: Output<T> ): void {
    removeOne( this.outputs, output )
  }
}

export class UnsafePassthrough<In,Out> implements iConsumer<In>, Producer<Out> {
  private readonly outputs: Output<Out>[]

  constructor() {
    this.outputs = []
  }

  attachSource<U extends In>( source: Producer<U> ): void {
    source.addOutput( this.push )
  }

  removeOutput( output: Output<Out> ): void {
    removeOne( this.outputs, output )
  }

  public push = <U extends In>( ...values: U[] ): void => {
    for ( const output of this.outputs ) {
      process.nextTick( output, ...values )
    }
  }

  addOutput( output: Output<Out> ): void {
    this.outputs.push( output )
  }
}

export class Mixer<In,Out extends In = In> implements Producer<In> {
  readonly name: string
  private readonly stream: UnsafePassthrough<In,Out>
  private readonly sources: iSpliter<In>[]
  private readonly channels: Set<string>

  constructor( name = '[Mixer]' ) {
    this.name = name
    this.channels = new Set()
    this.sources = []
    this.stream = new UnsafePassthrough()
  }
  removeOutput( output: Output<In> ): void {
    this.stream.removeOutput( output )
  }

  addOutput( output: Output<In> ): void {
    this.stream.addOutput( output )
  }

  attachSource<U extends iSpliter<In>>( source: U ): void {
    this.sources.push( source )
    for ( const channel of this.channels ) {
      source.on( channel, this.stream.push )
    }
  }

  enableChannel( channel: string ): void {
    const needRefresh = !this.channels.has( channel )
    this.channels.add( channel )
    if ( needRefresh ) {
      for ( const source of this.sources ) {
        source.on( channel, this.stream.push )
      }
    } else {
      console.error( 'Redundant channel activation', channel )
    }
  }

  pipeTo< T extends iConsumer<Out>> ( stream: T ): T {
    stream.attachSource( this.stream )
    return stream
  }

  pipeMany< T extends iConsumer<Out>> ( ...streams: T[] ): void {
    for ( const stream of streams ) {
      stream.attachSource( this.stream )
    }
  }
}

export class SplitterWithPassthrough<T> {
  readonly passthrough: Passthrough<T>
  readonly splitter: Splitter<T>
  readonly push: <U extends T>( v: U ) => void
  name: string

  constructor( tagger: Tagger<T>, name = '[SplitterWithPassthrough' ) {
    this.name = name
    this.passthrough = new Passthrough<T>()
    this.splitter = new Splitter<T>( tagger )

    this.push = <U extends T>( v: U ) => {
      this.splitter.push( v )
      this.passthrough.push( v )
    }
  }


  on<U extends T>( type: string, output: Output<U> ): void {
    this.splitter.on( type, output )
  }
  
  pipeTo<U extends Mixer<Partial<T>> >( stream: U ): U
  pipeTo<U extends iConsumer<Partial<T>>>( stream: U ): U {
    if ( isMixer<U,T>( stream ) ) {
      stream.attachSource( this.splitter )
    } else {
      stream.attachSource( this.passthrough )
    }
    return stream
  }

  pipeMany( streams: ( Mixer<T> | iConsumer<T> )[] ): void {
    for ( const stream of streams ) {
      if ( isMixer<( Mixer<T> | iConsumer<T> ),T>( stream ) ) {
        stream.attachSource( this.splitter )
      } else {
        stream.attachSource( this.passthrough )
      }
    }
  }
}

export type Transformation<In,Out> = ( v: In ) => Out
export type AsyncTransformation<In,Out> = ( v: In ) => Promise<Out>

export class AsyncTransformStream<In,Out> implements Producer<Out> {
  private readonly outputs: Output<Out>[]
  readonly push: <T extends In>( ...values: T[] ) => void

  constructor( transformation: AsyncTransformation<In,Out> ) {
    this.outputs = []

    this.push = <T extends In>( ...values: T[] ) => {
      values.asyncMap( transformation ).then( ( transformed: Out[] ) => {
        for ( const output of this.outputs ) {
          process.nextTick( output, ...transformed )
        }
      } ).catch( e => {
        // TODO add mechanism that will ensure that errors that are thrown here will not be in
        // error log stream flow and if not send them there (to error stream)
        console.error( e )
      } )
    }
  }
  removeOutput( output: Output<Out> ): void {
    removeOne( this.outputs, output )
  }

  attachSource<U extends In>( source: Producer<U> ): void {
    source.addOutput( this.push )
  }

  addOutput( v: Output<Out> ): void {
    this.outputs.push( v )
  }

  pipeTo<T extends iConsumer<Partial<Out>>> ( stream: T ): T {
    stream.attachSource( this )
    return stream
  }

  pipeMany<T extends iConsumer<Out> >( ...streams: T[] ): void {
    for ( const stream of streams ) {
      stream.attachSource( this )
    }
  }
}

export class TransformStream<In,Out> implements Producer<Out> {
  private readonly outputs: Output<Out>[]
  readonly push: <T extends In>( ...values: T[] ) => void

  constructor( transformation: Transformation<In,Out> ) {
    this.outputs = []

    this.push = <T extends In>( ...values: T[] ) => {
      const transformed = values.map( transformation )
      for ( const output of this.outputs ) {
        process.nextTick( output, ...transformed )
      }
    }
  }
  removeOutput( output: Output<Out> ): void {
    removeOne( this.outputs, output )
  }

  attachSource<U extends In>( source: Producer<U> ): void {
    source.addOutput( this.push )
  }

  addOutput( v: Output<Out> ): void {
    this.outputs.push( v )
  }

  pipeTo<T extends iConsumer<Partial<Out>>> ( stream: T ): T {
    stream.attachSource( this )
    return stream
  }

  pipeMany<T extends iConsumer<Out> >( ...streams: T[] ): void {
    for ( const stream of streams ) {
      stream.attachSource( this )
    }
  }
}

export class TapStream<T extends Partial<T> > implements Producer<T> {
  private readonly callback: Output<T>
  private readonly outputs: Output<T>[]
  readonly push: ( ...values: T[] ) => void

  constructor( callback: Output<T> ) {
    this.callback = callback
    this.outputs = []

    this.push = ( ...values: T[] ) => {
      this.callback( ...values )
      for ( const output of this.outputs ) {
        process.nextTick( output, ...values )
      }
    }
  }
  removeOutput( output: Output<T> ): void {
    removeOne( this.outputs, output )
  }

  attachSource<U extends T>( source: Producer<U> ): void {
    source.addOutput( this.push )
  }

  addOutput( v: Output<T> ): void {
    this.outputs.push( v )
  }

  pipeTo( stream: iConsumer<T> ): iConsumer<T> {
    stream.attachSource( this )
    return stream
  }
}

export class FilterStream<T extends Partial<T> > implements Producer<T> {
  private readonly predicate: (x:T) => boolean
  private readonly outputs: Output<T>[]
  readonly push: ( ...values: T[] ) => void

  constructor( predicate: (x:T) => boolean ) {
    this.predicate = predicate
    this.outputs = []

    this.push = ( ...values: T[] ) => {
      const filtered = values.filter( this.predicate )
      if (filtered.length > 0) {
        for ( const output of this.outputs ) {
          process.nextTick( output, ...values )
        }
      }
    }
  }
  removeOutput( output: Output<T> ): void {
    removeOne( this.outputs, output )
  }

  attachSource<U extends T>( source: Producer<U> ): void {
    source.addOutput( this.push )
  }

  addOutput( v: Output<T> ): void {
    this.outputs.push( v )
  }

  pipeTo( stream: iConsumer<T> ): iConsumer<T> {
    stream.attachSource( this )
    return stream
  }
}

export function map<In,Out>( tranformation: Transformation<In,Out> ): TransformStream<In, Out> {
  return new TransformStream( tranformation )
}

export function asyncMap<In,Out>( tranformation: AsyncTransformation<In,Out> ): AsyncTransformStream<In, Out> {
  return new AsyncTransformStream( tranformation )
}

export function delay<In>( ms: TimestampInMilliseconds ): AsyncTransformStream<In, In> {
  return new AsyncTransformStream( async ( value ) => {
    await sleep( ms )
    return value
  } )
}

export function tap<T>( cb: (( v:T ) => unknown) | (() => unknown) ): TapStream<T> {
  return new TapStream<T>( cb )
}

export function passthrough<T>(): Passthrough<T> {
  return new Passthrough()
}

export function filter<T,U>( predicate: (x:T | U) => x is U ): TransformStream<T|U,U>
export function filter<T>( predicate: (x:T) => boolean ): TransformStream<T,T>
export function filter( predicate: unknown ): TransformStream<unknown,unknown> {
  return new FilterStream( predicate as any ) as any
}

export function flow<T>( input: Producer<T>, output: iConsumer<T> ): void

export function flow<T,T1>(
  v0: Producer<T>,
  v1: iConsumer<T> & Producer<T1>,
  v2: iConsumer<T1> ): void

export function flow<T,T1,T2>(
  v0: Producer<T>,
  v1: iConsumer<T> & Producer<T1>,
  v2: iConsumer<T1> & Producer<T2>,
  v3: iConsumer<T2> ): void  

export function flow<T,T1,T2,T3>(
  v0: Producer<T>,
  v1: iConsumer<T> & Producer<T1>,
  v2: iConsumer<T1> & Producer<T2>,
  v3: iConsumer<T2> & Producer<T3>,
  v4: iConsumer<T3> ): void  

export function flow<T,T1,T2,T3,T4>(
  v0: Producer<T>,
  v1: iConsumer<T> & Producer<T1>,
  v2: iConsumer<T1> & Producer<T2>,
  v3: iConsumer<T2> & Producer<T3>,
  v4: iConsumer<T3> & Producer<T4>,
  v5: iConsumer<T4> ): void  

export function flow<T,T1,T2,T3,T4,T5>(
  v0: Producer<T>,
  v1: iConsumer<T> & Producer<T1>,
  v2: iConsumer<T1> & Producer<T2>,
  v3: iConsumer<T2> & Producer<T3>,
  v4: iConsumer<T3> & Producer<T4>,
  v5: iConsumer<T4> & Producer<T5>,
  v6: iConsumer<T5> ): void  

export function flow<T,T1,T2,T3,T4,T5,T6>(
  v0: Producer<T>,
  v1: iConsumer<T> & Producer<T1>,
  v2: iConsumer<T1> & Producer<T2>,
  v3: iConsumer<T2> & Producer<T3>,
  v4: iConsumer<T3> & Producer<T4>,
  v5: iConsumer<T4> & Producer<T5>,
  v6: iConsumer<T5> & Producer<T6>,
  v7: iConsumer<T6> ): void  

export function flow( ...args: unknown[] ): void {
  let input: Producer<unknown> = args[0] as Producer<unknown>
  let output: iConsumer<unknown> = args[1] as iConsumer<unknown>
  let i = 1
  while ( i < args.length ) {
    output.attachSource( input )
    i++
    input = output as unknown as Producer<unknown>
    output = args[i] as iConsumer<unknown>
  }
}

