入出力の中継
Posted feedbacks - Flatten
Nested Hidden
ファイルハンドル難しい><
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | #!/usr/local/bin/perl
use strict;
use warnings;
use IPC::Open2;
use POSIX qw( :sys_wait_h );
use Fcntl qw( F_GETFL F_SETFL O_NONBLOCK );
my ( $fd_from_a, $fd_to_a, $fd_from_b, $fd_to_b );
$SIG{CHLD} = \&handler_SIGCHLD;
my @pids;
push @pids, open2( $fd_from_a, $fd_to_a, '/usr/pkg/sbin/nc -l -p 60001' );
push @pids, open2( $fd_from_b, $fd_to_b, '/usr/pkg/sbin/nc -l -p 60002' );
my $old_flags = fcntl $fd_from_a, F_GETFL, 0;
fcntl $fd_from_a, F_SETFL, $old_flags | O_NONBLOCK;
$old_flags = fcntl $fd_from_b, F_GETFL, 0;
fcntl $fd_from_b, F_SETFL, $old_flags | O_NONBLOCK;
while(1) {
sleep 1;
print $fd_to_b ( <$fd_from_a> );
print $fd_to_a ( <$fd_from_b> );
}
kill 1, @pids;
sub handler_SIGCHLD
{
kill 1, @pids;
exit;
}
|
あうあう、プログラムハードコードしてた><
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | #!/usr/local/bin/perl
use strict;
use warnings;
use IPC::Open2;
use POSIX qw( :sys_wait_h );
use Fcntl qw( F_GETFL F_SETFL O_NONBLOCK );
my ( $fd_from_a, $fd_to_a, $fd_from_b, $fd_to_b );
$SIG{CHLD} = \&handler_SIGCHLD;
my @pids;
if ( @ARGV < 2 ) {
die "$0 <cmd1> <cmd2>\n";
}
push @pids, open2( $fd_from_a, $fd_to_a, $ARGV[0] );
push @pids, open2( $fd_from_b, $fd_to_b, $ARGV[1] );
my $old_flags = fcntl $fd_from_a, F_GETFL, 0;
fcntl $fd_from_a, F_SETFL, $old_flags | O_NONBLOCK;
$old_flags = fcntl $fd_from_b, F_GETFL, 0;
fcntl $fd_from_b, F_SETFL, $old_flags | O_NONBLOCK;
while(1) {
sleep 1;
print $fd_to_b ( <$fd_from_a> );
print $fd_to_a ( <$fd_from_b> );
}
kill 1, @pids;
sub handler_SIGCHLD
{
kill 1, @pids;
exit;
}
|
正統的にやってみたつもりです
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import java.io.*;
public class Sample extends Thread {
Process pIn, pOut;
public static void main(String[] args) throws IOException {
Runtime r = Runtime.getRuntime();
Process p1 = r.exec(args[0]);
Process p2 = r.exec(args[1]);
new Sample(p1, p2).start();
new Sample(p2, p1).start();
}
Sample (Process in, Process out) {
pIn = in;
pOut = out;
}
public void run() {
try {
InputStream in = pIn.getInputStream();
OutputStream out = pOut.getOutputStream();
int c;
while ((c = in.read()) != -1) {
out.write(c);
}
pIn.waitFor();
pOut.destroy();
}
catch (Exception e) {
// e.printStackTrace();
}
}
}
|
楽をしようとsubprocessモジュールを使ったけど、どうもかわいくない。 まじめにselectでもした方がよかったかも。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import sys, os, signal
from subprocess import Popen, PIPE
p1 = Popen(sys.argv[1], shell=True, stdin=PIPE, stdout=PIPE)
p2 = Popen(sys.argv[2], shell=True, stdin=p1.stdout, stdout=PIPE)
try:
while p1.poll() == None and p2.poll() == None:
p1.stdin.write(p2.stdout.readline())
except IOError:
pass
if p1.poll() == None:
os.kill(p1.pid, signal.SIGTERM)
p1.wait()
if p2.poll() == None:
os.kill(p2.pid, signal.SIGTERM)
p2.wait()
|
SIGCHLDがかわいくない。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import sys, os, signal
from subprocess import Popen, PIPE
p1 = Popen(sys.argv[1], shell=True, stdin=PIPE, stdout=PIPE)
p2 = Popen(sys.argv[2], shell=True, stdin=p1.stdout, stdout=p1.stdin)
def cleanup(sig, stack):
if p1.poll() == None:
os.kill(p1.pid, signal.SIGTERM)
if p2.poll() == None:
os.kill(p2.pid, signal.SIGTERM)
signal.signal(signal.SIGCHLD, cleanup)
signal.pause()
|
確認用にファイルに出力している
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | using System;
using System.Diagnostics;
using System.IO;
using System.Reflection;
using System.Threading;
class Program
{
static void Main(string[] args)
{
Assembly asm = Assembly.GetExecutingAssembly();
Directory.SetCurrentDirectory(Path.GetDirectoryName(asm.Location));
string exe = Path.GetFileName(asm.Location);
if (args.Length == 0)
{
Process.Start(asm.Location, string.Format("\"{0} -1\" \"{0} -2\"", exe));
}
else if (args.Length == 1)
{
if (args[0] == "-1")
{
using (StreamWriter sw = new StreamWriter("res1.txt"))
{
Console.WriteLine("一郎: こんちは");
sw.WriteLine("一郎: こんちは");
string s = Console.ReadLine();
Console.WriteLine("一郎: よろしく > " + s);
sw.WriteLine("一郎: よろしく > " + s);
}
}
else
{
using (StreamWriter sw = new StreamWriter("res2.txt"))
{
string s = Console.ReadLine();
Console.WriteLine("二郎: よろしく > " + s);
sw.WriteLine("二郎: よろしく > " + s);
}
}
}
else if (args.Length == 2)
{
Process p1 = new Process();
Process p2 = new Process();
string[] ss1 = args[0].Split();
string[] ss2 = args[1].Split();
p1.StartInfo.FileName = ss1[0];
p2.StartInfo.FileName = ss2[0];
p1.StartInfo.Arguments = string.Join(" ", ss1, 1, ss1.Length - 1);
p2.StartInfo.Arguments = string.Join(" ", ss2, 1, ss1.Length - 1);
p1.StartInfo.UseShellExecute = p2.StartInfo.UseShellExecute = false;
p1.StartInfo.RedirectStandardInput = p1.StartInfo.RedirectStandardOutput = true;
p2.StartInfo.RedirectStandardInput = p2.StartInfo.RedirectStandardOutput = true;
p1.Start();
p2.Start();
new Thread(delegate()
{
string s;
while ((s = p1.StandardOutput.ReadLine()) != null) p2.StandardInput.WriteLine(s);
}).Start();
new Thread(delegate()
{
string s;
while ((s = p2.StandardOutput.ReadLine()) != null) p1.StandardInput.WriteLine(s);
}).Start();
p1.WaitForExit();
p2.WaitForExit();
}
}
}
|
本来はPIPEを作ってやるべきだろうと思い直しました。Javaでは無理そうなので、C を使います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | #include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
int main(int argc, char *argv[])
{
int p1[2], p2[2];
pid_t pid1, pid2, p;
int stat;
pipe(p1);
pipe(p2);
pid1 = fork();
if (pid1 == 0) { // child process 1
dup2(p1[0], 0);
close(p1[0]);
close(p1[1]);
dup2(p2[1], 1);
close(p2[0]);
close(p2[1]);
execl(argv[1], argv[1], NULL);
exit(2);
}
pid2 = fork();
if (pid2 == 0) { // child process 2
dup2(p2[0], 0);
close(p2[0]);
close(p2[1]);
dup2(p1[1], 1);
close(p1[0]);
close(p1[1]);
execl(argv[2], argv[2], NULL);
exit(2);
}
close(p2[0]); // parent process
close(p2[1]);
close(p1[0]);
close(p1[1]);
p = wait(&stat);
if (p == pid1)
kill(pid2, SIGINT);
else if (p == pid2)
kill(pid1, SIGINT);
p = wait(&stat);
return (0);
}
|
UNIX系(POSIX互換?)じゃないと動かないと思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | #include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>
int main(int argc, char *argv[])
{
pid_t a, b, c;
int pipe_a2b[2];
int pipe_b2a[2];
int status;
char *param[2] = {NULL, NULL};
if (pipe(pipe_a2b) == -1) { perror("pipe_a2b"); }
if (pipe(pipe_b2a) == -1) { perror("pipe_b2a"); }
a = fork();
param[0] = argv[1];
if (a == 0) {
close(pipe_a2b[0]);
close(pipe_b2a[1]);
dup2(pipe_b2a[0], 0);
dup2(pipe_a2b[1], 1);
close(pipe_b2a[0]);
close(pipe_a2b[1]);
execvp(argv[1], param);
}
b = fork();
param[0] = argv[2];
if (b == 0) {
close(pipe_b2a[0]);
close(pipe_a2b[1]);
dup2(pipe_a2b[0], 0);
dup2(pipe_b2a[1], 1);
close(pipe_a2b[0]);
close(pipe_b2a[1]);
execvp(argv[2], param);
}
c = wait(&status);
if (a == c) {
kill(b, SIGTERM);
}
else {
kill(a, SIGTERM);
}
return 0;
}
|
SIGINTではなくSIGTERMを使うべきらしい……
ロバストにするのはもっと面倒だけど。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | (use gauche.threads)
(use gauche.process)
(use srfi-42)
(define (main args)
(unless (= (length args) 3) (error "needs two arguments"))
(let* ((ps (map (cut run-process <> :input :pipe :output :pipe) (cdr args)))
(is (map process-output ps))
(os (map process-input ps)))
(define (finish sig)
(do-ec (: p ps)
(guard (e (else #t)) (process-send-signal p SIGTERM))) (exit 0))
(do-ec (: ss (list is os)) (: p ss) (set! (port-buffering p) :none))
(set-signal-handler! SIGCHLD finish)
(set-signal-handler! SIGPIPE finish)
(do-ec (:parallel (: i is) (: o (reverse os)))
(thread-start! (make-thread (lambda () (copy-port i o)))))
(sys-pause)))
|
OSProcess インストール済みの Squeak Smalltalk で。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | | pipe1 pipe2 procA procB semaA semaB |
OSProcess accessor canAccessSystem ifFalse: [^self].
pipe1 := OSPipe nonBlockingPipe.
pipe2 := OSPipe nonBlockingPipe.
procA := UnixProcess
forkJob: 'a'
arguments: nil
environment: nil
descriptors: {pipe1 reader. pipe2 writer. nil}.
procB := UnixProcess
forkJob: 'b'
arguments: nil
environment: nil
descriptors: {pipe2 reader. pipe1 writer. nil}.
semaA := Semaphore new.
semaB := Semaphore new.
[[Processor yield. procA isComplete] whileFalse. procB sigterm. semaA signal] fork.
[[Processor yield. procB isComplete] whileFalse. procA sigterm. semaB signal] fork.
semaA wait.
semaB wait.
pipe1 close.
pipe2 close
|
Rubyを100%にするためにとりあえず。あまりまじめに書いてません……
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def pipe(f, g, th=nil)
while line = g.gets
f.puts line
end
th.exit if th
end
cmd1, cmd2 = ARGV
f1 = IO.popen(cmd1, "r+")
f2 = IO.popen(cmd2, "r+")
begin
th1 = Thread.new { pipe f1, f2 }
th2 = Thread.new { pipe f2, f1, th1 }
th1.join
rescue
end
|
厳密にはプロセス監視してないけど、動作は同じということでひとつ。。 Bが終了した場合の動作は、OSに依存するかもしれない。
1 2 3 4 5 6 7 8 9 10 11 | BEGIN {
A = ARGV[1]
B = ARGV[2]
pipe(A, B)
}
function pipe(from, to) {
while(from | getline > 0)
print | to
close(from)
close(to)
}
|
AllegroCL + Windows ですが、fork もシグナルも無いのでこれくらいで許してください…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | (defun pipe (cmd1 cmd2)
(let ((A (run-shell-command cmd1 :input :stream :output :stream :separate-streams nil :wait nil))
(B (run-shell-command cmd2 :input :stream :output :stream :separate-streams nil :wait nil))
(continue? T))
(labels ((copy-stream (from to)
(mp:waiting-for-input-available (from)
(loop while continue?
for c = (ignore-errors (read-char from nil :eof)) do
(when (eql c :eof)
(close to)
(setf continue? nil)
(return))
(write-char c to)
(force-output to)))))
(mp:process-run-function "A -> B" #'copy-stream A B)
(mp:process-run-function "B -> A" #'copy-stream B A)
(loop while continue? do (mp:process-allow-schedule)))))
(pipe "c:\\usr\\bin\\cat.exe z:\\home\\.wgetrc"
"perl -MIO::File -e \"$io=IO::File->new('>c:\\home\\output.txt');while(<>){print $_;print $io $_};exit 0;\"")
|
むずかしい・・・
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import java.io._
class LoopTread(in:Process,out:Process) extends Thread {
val _pin = in
val _pout = out
override def run = {
val in = _pin.getInputStream
val out = _pout.getOutputStream
var c = in.read
def lambda():Unit = {
try { _pin.exitValue }catch {
case ex:IllegalThreadStateException =>
in.read match {
case -1 => lambda
case i => out.write(i);lambda
}
}
_pin.waitFor
_pout.destroy
};lambda
}
}
def loopProcess(a:String, b:String) = {
val p1 = Runtime.getRuntime.exec(a)
val p2 = Runtime.getRuntime.exec(b)
new LoopTread(p1, p2).start
new LoopTread(p2, p1).start
}
|
手抜きして要らないfd閉じなかったけど 本当は閉じなきゃ駄目かも。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import System
import System.Posix.IO
import System.Posix.Process
import System.Posix.Signals
import System.Posix.Types
main = do [cmdA, cmdB] <- getArgs
(rPipeAtoB, wPipeAtoB) <- createPipe
(rPipeBtoA, wPipeBtoA) <- createPipe
pidA <- runChild cmdA rPipeBtoA wPipeAtoB
pidB <- runChild cmdB rPipeAtoB wPipeBtoA
Just (pid, _) <- getAnyProcessStatus True False
signalProcess softwareTermination (if pid == pidA then
pidB
else
pidA)
runChild :: String -> Fd -> Fd -> IO ProcessID
runChild cmd rPipe wPipe
= forkProcess $
do dupTo rPipe stdInput
dupTo wPipe stdOutput
executeFile cmd True [] Nothing
|
1 2 3 4 5 6 7 8 9 | function relay(A, B){
rt = Runtime.getRuntime()
a = rt.exec(A)
b = rt.exec(B)
t1 = fork({->read(a.inputStream, b.outputStream); a.waitFor(); b.destroy() })
t2 = fork({->read(b.inputStream, a.outputStream); b.waitFor(); a.destroy() })
t1.join()
t2.join()
}
|
Java による他の投稿とほぼ同じ内容。
WSH での挑戦は atEndOfStream が挙動不審なため断念。
WSH での挑戦は atEndOfStream が挙動不審なため断念。
1 2 3 4 5 6 7 8 9 10 | function pipe(i, o){
for(var is = i.inputStream, os = o.outputStream, b; (b = is.read()) != -1; os.write(b));
os.close();
o.waitFor();
i.destroy();
}
if(arguments.length < 2) quit();
with(java.lang.Runtime.runtime) var pp = [exec(arguments[0]), exec(arguments[1])];
spawn(function(){ pipe(pp[0], pp[1]) });
spawn(function(){ pipe(pp[1], pp[0]) });
|
erlang勉強中です。 $ erlc ipc.erl $ erl -noshell -s ipc pipe "command1" "command2" と呼び出します。エラー処理,後処理等は一切省いてます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | -module(ipc).
-export([pipe/1]).
pipe([Cmd1, Cmd2]) ->
loop(
open_port({spawn, Cmd1}, [in]),
open_port({spawn, Cmd2}, [out])
).
loop(P1, P2) ->
receive
{P1, {data, D}} ->
port_command(P2, D),
loop(P1, P2)
end.
|
FIFO(名前つきパイプ)を使いました。
1 2 3 4 5 6 7 8 9 | #!/bin/bash
cmd_a=$1
cmd_b=$2
PIPE_B2A=$(mktemp -u /tmp/pipe_b2a.XXXXXXXXXX)
trap "rm -f $PIPE_B2A; exit" EXIT
mkfifo $PIPE_B2A
$cmd_a < $PIPE_B2A | $cmd_b > $PIPE_B2A
|






にしお
#3367()
Rating2/2=1.00
[ reply ]